【问题标题】:Writing large files with Node.js使用 Node.js 编写大文件
【发布时间】:2012-03-18 04:54:51
【问题描述】:

我正在使用 writable stream 用 node.js 编写一个大文件:

var fs     = require('fs');
var stream = fs.createWriteStream('someFile.txt', { flags : 'w' });

var lines;
while (lines = getLines()) {
    for (var i = 0; i < lines.length; i++) {
        stream.write( lines[i] );
    }
}

我想知道这个方案在不使用drain 事件的情况下是否安全?如果不是(我认为是这种情况),将任意大数据写入文件的模式是什么?

【问题讨论】:

标签: node.js large-files


【解决方案1】:

drain 背后的想法是您可以在这里使用它进行测试:

var fs = require('fs');
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'});

var lines;
while (lines = getLines()) {
    for (var i = 0; i < lines.length; i++) {
        stream.write(lines[i]); //<-- the place to test
    }
}

你不是。因此,您需要重新架构以使其“可重入”。

var fs = require('fs');
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'});

var lines;
while (lines = getLines()) {
    for (var i = 0; i < lines.length; i++) {
        var written = stream.write(lines[i]); //<-- the place to test
        if (!written){
           //do something here to wait till you can safely write again
           //this means prepare a buffer and wait till you can come back to finish
           //  lines[i] -> remainder
        }
    }
}

但是,这是否意味着您需要在等待时继续缓冲 getLines?

var fs = require('fs');
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'});

var lines,
    buffer = {
     remainingLines = []
    };
while (lines = getLines()) {
    for (var i = 0; i < lines.length; i++) {
        var written = stream.write(lines[i]); //<-- the place to test
        if (!written){
           //do something here to wait till you can safely write again
           //this means prepare a buffer and wait till you can come back to finish
           //  lines[i] -> remainder
           buffer.remainingLines = lines.slice(i);
           break;
           //notice there's no way to re-run this once we leave here.
        }
    }
}

stream.on('drain',function(){
  if (buffer.remainingLines.length){
    for (var i = 0; i < buffer.remainingLines.length; i++) {
      var written = stream.write(buffer.remainingLines[i]); //<-- the place to test
      if (!written){
       //do something here to wait till you can safely write again
       //this means prepare a buffer and wait till you can come back to finish
       //  lines[i] -> remainder
       buffer.remainingLines = lines.slice(i);
      }
    }
  }
});

【讨论】:

  • 不需要使用自己的缓冲区。 Node.js 已经为您完成了。读取源文件nodejs-source/lib/fs.js#WriteStream.prototype.write
【解决方案2】:

[编辑] 更新后的 Node.js writable.write(...) API docs 说:

[The] 返回值是严格建议性的。你可以继续写,即使它返回假。但是,写入将被缓冲在内存中,因此最好不要过度执行此操作。相反,在写入更多数据之前等待 drain 事件。

[原文]来自stream.write(...) documentation(强调我的):

如果字符串已刷新到内核缓冲区,则返回 true。返回false,表示内核缓冲区已满,数据以后会发送出去

我将此解释为“write”函数返回true如果给定的字符串被立即写入底层操作系统缓冲区或false如果它还没有被写入但是将被写入函数(例如,WriteStream 可能为您缓冲了),因此您不必再次调用“write”。

【讨论】:

  • 但是“以这种方式写入文件描述符时,在流耗尽之前关闭描述符可能会发送无效(关闭的)FD。”让我认为缓冲区已满意味着它不能再接受您的任何代码。老实说,我不知道,在这里只给出了我最好的猜测作为答案。
  • @jcolebrand:是的,我也不知道,但我猜“drain”事件只是表明操作系统准备好立即写入,以防你真的想避免缓冲任何类型,无论是您自己的还是来自 WriteStream“写入”方法。但是,“drain”的文档中提到了“safe to write again”,这要么是一个糟糕的措辞选择,要么是不利于我的解释的证据!
【解决方案3】:

这就是我最终做到的方式。背后的想法是创建实现ReadStream接口的可读流,然后使用pipe()方法将数据通过管道传输到可写流。

var fs = require('fs');
var writeStream = fs.createWriteStream('someFile.txt', { flags : 'w' });
var readStream = new MyReadStream();

readStream.pipe(writeStream);
writeStream.on('close', function () {
    console.log('All done!');
});

MyReadStream类的例子可以取自猫鼬QueryStream

【讨论】:

  • 当我们只对将内容写入文件感兴趣时,为什么需要 ReadStream() ?
  • @nab 谢谢。管道时似乎没有为换行添加\r\n,所以将每一行连接到一个......
  • 找不到查询流
  • ReadStream 接口链接断开。
【解决方案4】:

我发现流在处理大文件时表现不佳 - 这是因为您无法设置足够的输入缓冲区大小(至少我不知道这样做的好方法)。我就是这样做的:

var fs = require('fs');

var i = fs.openSync('input.txt', 'r');
var o = fs.openSync('output.txt', 'w');

var buf = new Buffer(1024 * 1024), len, prev = '';

while(len = fs.readSync(i, buf, 0, buf.length)) {

    var a = (prev + buf.toString('ascii', 0, len)).split('\n');
    prev = len === buf.length ? '\n' + a.splice(a.length - 1)[0] : '';

    var out = '';
    a.forEach(function(line) {

        if(!line)
            return;

        // do something with your line here

        out += line + '\n';
    });

    var bout = new Buffer(out, 'ascii');
    fs.writeSync(o, bout, 0, bout.length);
}

fs.closeSync(o);
fs.closeSync(i);

【讨论】:

  • 您在readStream/writeStreamreadSync/writeSync 之间有任何基准测试来确认这个答案吗?谢谢。
  • 定义的“bout”变量是什么?
【解决方案5】:

处理此问题的最简洁方法是将您的线路生成器设为readable stream - 我们称之为lineReader。然后以下内容将自动处理缓冲区并为您很好地排出:

lineReader.pipe(fs.createWriteStream('someFile.txt'));

如果您不想制作可读流,您可以收听write 的输出以了解缓冲区是否已满,并做出如下响应:

var i = 0, n = lines.length;
function write () {
  if (i === n) return;  // A callback could go here to know when it's done.
  while (stream.write(lines[i++]) && i < n);
  stream.once('drain', write);
}
write();  // Initial call.

可以在here找到一个更长的例子。

【讨论】:

    【解决方案6】:

    这个问题的几个建议答案完全忽略了关于流的要点。

    这个模块可以帮助https://www.npmjs.org/package/JSONStream

    但是,让我们假设所描述的情况并自己编写代码。您正在从 MongoDB 作为流读取,默认情况下 ObjectMode = true。

    如果您尝试直接流式传输到文件,这将导致问题 - 例如“无效的非字符串/缓冲区块”错误。

    这类问题的解决方法很简单。

    只需在可读和可写之间放置另一个转换,以将可读的对象适当地调整为可写的字符串。

    示例代码解决方案:

    var fs = require('fs'),
        writeStream = fs.createWriteStream('./out' + process.pid, {flags: 'w', encoding: 'utf-8' }),
        stream = require('stream'),
        stringifier = new stream.Transform();
    stringifier._writableState.objectMode = true;
    stringifier._transform = function (data, encoding, done) {
        this.push(JSON.stringify(data));
        this.push('\n');
        done();
    }
    rowFeedDao.getRowFeedsStream(merchantId, jobId)
    .pipe(stringifier)
    .pipe(writeStream).on('error', function (err) {
       // handle error condition
    }
    

    【讨论】:

      【解决方案7】:

      如果您碰巧没有输入流,则无法轻松使用管道。 以上都不适合我,排水事件不会触发。解决如下(基于 Tylers 的回答):

      var lines[]; // some very large array
      var i = 0;
      
      function write() {
          if (i < lines.length)  {
              wstream.write(lines[i]), function(err){
                  if (err) {
                      console.log(err);
                  } else {
                      i++;
                      write();
                  }
              });
          } else {
              wstream.end();
              console.log("done");
          }
      };
      write();
      

      【讨论】:

        猜你喜欢
        • 2011-01-30
        • 2019-06-25
        • 1970-01-01
        • 2015-08-25
        • 2014-02-15
        • 1970-01-01
        • 1970-01-01
        • 2012-08-01
        相关资源
        最近更新 更多