【问题标题】:Node Read Streams - How can I limit the number of open files?节点读取流 - 如何限制打开文件的数量?
【发布时间】:2022-01-01 23:59:40
【问题描述】:

我在流式传输多个文件时遇到了AggregateError: EMFILE: too many open files

机器详细信息: macOS 蒙特雷, MacBook Pro(14 英寸,2021 年), 芯片 Apple M1 Pro, 内存 16GB, 节点 v16.13.0

我试过增加限制但没有运气。 理想情况下,我希望能够设置一次打开文件数量的限制,或者通过在文件使用后立即关闭来解决。

代码如下。我试图删除不相关的代码并将其替换为'//...'。

const MultiStream = require('multistream');
const fs = require('fs-extra'); // Also tried graceful-fs and the standard fs
const { fdir } = require("fdir");
// Also have a require for the bz2 and split2 functions but editing from phone right now

//...

let files = [];

//...

(async() => {

  const crawler = await new fdir()
  .filter((path, isDirectory) => path.endsWith(".bz2"))
  .withFullPaths()
  .crawl("Dir/Sub Dir")
  .withPromise();

  for(const file of crawler){
    files = [...files, fs.createReadStream(file)]
  }

  multi = await new MultiStream(files)
    // Unzip
    .pipe(bz2())
    // Create chunks from lines
    .pipe(split2())
    .on('data', function (obj) {
      // Code to filter data and extract what I need
      //...
    })
    .on("error", function(error) {
      // Handling parsing errors
      //...
    })
    .on('end', function(error) {
      // Output results
      //...
    })

})();

【问题讨论】:

  • 有什么理由不只是将 readstream 推送到 files 数组中?您还尝试读取多少个文件?凡事都有其局限
  • 有多少个文件?
  • 我的原始代码可以处理一个月的数据,其中包含 28,000 个 NDJSON 文件,但是当我尝试处理包含 323,000 个 NDJSON 文件的一年数据时失败了。现在测试建议的代码:D

标签: node.js file stream fs aggregateerror


【解决方案1】:

为了防止为阵列中的每个文件预先打开文件句柄,您希望仅在轮到该特定文件进行流式传输时按需打开文件。而且,您可以使用多流来做到这一点。

根据多流doc,你可以通过改变这个来懒惰地创建readStreams:

  for(const file of crawler){
    files = [...files, fs.createReadStream(file)]
  }

到这里:

  let files = crawler.map((f) => {
      return function() {
          return fs.createReadStream(f);
      }
  });

【讨论】:

  • dangit 我太慢了。这里地图的使用也很流畅
  • 哇,在代码中进行如此简单的更改并在第一次没有错误的情况下完成了。不必循环浏览文件,也可以减少几分钟。谢谢!
【解决方案2】:

在阅读了multistream 的 npm 页面后,我想我找到了一些有用的东西。我还编辑了您将流添加到文件数组的位置,因为我认为不需要实例化新数组并像您正在做的那样传播现有元素。

要延迟创建流,请将它们包装在一个函数中:

    var streams = [
      fs.createReadStream(__dirname + '/numbers/1.txt'),
      function () { // will be executed when the stream is active
        return fs.createReadStream(__dirname + '/numbers/2.txt')
      },
      function () { // same
        return fs.createReadStream(__dirname + '/numbers/3.txt')
      }
    ]
    
    new MultiStream(streams).pipe(process.stdout) // => 123 ```

这样,我们可以通过简单地将 readStreams 包装在函数中来更新您的逻辑以包含此功能,这样在需要它们之前不会创建流​​。这将防止您一次打开太多。我们可以通过简单地更新您的文件循环来做到这一点:

for(const file of crawler){
    files.push(function() {
        return fs.createReadStream(file)
    })
}

【讨论】:

  • 这也行得通,谢谢!我查看了文档,但对情况的了解不足以意识到这是解决方案。在阅读了有关 graceful-fs 以及它如何尝试解决 EMFILE 错误之后,我确信这就是我出错的地方,而不是关闭文件或其他东西。再次感谢您帮助我了解情况!
猜你喜欢
  • 2014-02-28
  • 2017-06-01
  • 2022-01-25
  • 2018-07-19
  • 2015-10-16
  • 2010-09-07
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多