【问题标题】:wait for previous stream to be empty before allowing reading在允许读取之前等待上一个流为空
【发布时间】:2016-03-22 15:43:37
【问题描述】:

假设我有一个包含整数列表的文件,每行一个。我使用fs.createReadStream 并将其传递给split(这样每个块都是一个整数)。然后我将其通过管道传输到一个双工流中,该流应该添加数字并通过管道将总和写入fs.createWriteStream

var fs = require('fs');
var stream = require('stream');
var split = require('split');

var addIntegers = new stream.Duplex();

addIntegers.sum = 0;

addIntegers._read = function(size) {
  this.push(this.sum + '\n');
}

addIntegers._write = function(chunk, encoding, done) {
  this.sum += +chunk;
  done();
}

fs.createReadStream('list-of-integers.txt')
  .pipe(split())
  .pipe(addIntegers)
  .pipe(fs.createWriteStream('sum.txt'));

当我运行它时,sum.txt 会不断地被零填充,并且程序永远不会终止(如预期的那样)。如何在允许输出流 (fs.createWriteStream) 从 addIntegers 读取之前等待输入流 (split) 为空?

【问题讨论】:

  • 为什么不在_write() 内代替push()
  • 嗯,是的,我可以在_write 中使用转换流和this.push(this.sum + '\n')。我以这种方式得到一个运行总数,这比永远为零要好得多。但是,有什么办法可以得到总和吗? (写一次到sum.txt 而不是 n 次?)。我意识到之后我可以tail -n 1 sum.txt,但我想知道是否有一种惯用的node 方式来获得我想要的东西。

标签: node.js stream duplex


【解决方案1】:

我想通了。

我决定改用Transform 流(感谢 mscdex),因为它有一个方法(_flush),在消耗所有写入数据后会被调用。工作代码如下。别忘了npm i split :)

var fs = require('fs');
var stream = require('stream');
var split = require('split');

var addIntegers = new stream.Transform();

addIntegers.sum = 0;

addIntegers._transform = function(chunk, encoding, done) {
  this.sum += +chunk;
  done();
}

addIntegers._flush = function(done) {
  this.push(this.sum + '\n');
}

fs.createReadStream('list-of-integers.txt')
  .pipe(split())
  .pipe(addIntegers)
  .pipe(fs.createWriteStream('sum.txt'));

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-12-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-06-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多