【问题标题】:stream.Transform accept new input before completing outputstream.Transform 在完成输出之前接受新输入
【发布时间】:2015-01-07 14:51:49
【问题描述】:

我正在实现一个带有异步操作的转换流。我的叫Parser

var Transform = require('stream').transform;

function Parser(options) {
  Transform.call(this, {objectMode: true});
}

Parser.prototype._transform = function _transform(input, encoding, callback) {
  var this_ = this;
  doSomethingAsync(input, function(output) {
    this_.push(output);
    //possible location #1 for callback();
  });
  //possible location #2 for callback();
}

每个传入的块可能需要很长时间来处理(doSomethingAsync 需要网络请求)。但是,每个块的处理完全独立于之前的块。此外,输出的确切顺序并不重要。每个输出都包含一个描述符,用于标识其输入,而不是按顺序标识。

因此,我希望尽快再次调用_transform,而不是等到给定块完全完成处理。所以,看看代码,如果我把callback() 放在possible location #1 中,那么在每个块被完全处理之前,_transform 永远不会被调用。但是如果我把它放在possible location #2中,那么我的流在回调之后推送,导致这些难看

Uncaught Error: stream.push() after EOF

流终止时出错。

所以我的问题是:是否可以使用转换流来做到这一点?还是我应该考虑使用图书馆?如果有,是哪种类型(事件流、FRP 等)?

谢谢。

【问题讨论】:

    标签: node.js stream


    【解决方案1】:

    您可以在流上实现@​​987654321@,并且仅在所有异步函数完成时才调用传递给该函数的回调。像这样的:

    function Parser(options) {
      Transform.call(this, {objectMode: true});
    
      this._pending = 0;
      this._flushcb = undefined;
    }
    
    Parser.prototype._transform = function _transform(input, encoding, callback) {
      var self = this;
    
      ++this._pending;
    
      doSomethingAsync(input, function(output) {
        self.push(output);
        if (--self._pending === 0 && self._flushcb)
          self._flushcb();
      });
    
      callback();
    }
    
    Parser.prototype._flush = function(callback) {
      this._flushcb = callback;
    };
    

    【讨论】:

    • 非常好——我没想到。谢谢!
    【解决方案2】:

    我相信答案并不完整。 想象一下你有一个这样的_transform()

    _transform(chunk, encoding, done) {
        let data = chunk.toString();
        this.rest += data;
        [this.toPush, this.rest] = this.f(this.rest);
        for (let i = 0; i < this.toPush.length; i++) {
            if (!this.push(this.toPush[i])) {
                this._source.readStop();
                break;
            } 
        }
        done()
    }
    

    ```

    其中f 是例如将接收到的块拆分为段落的函数。 rest 是块末尾的东西 f 无法确定它是否是整个段落,因此需要更多数据(另一个块)。阅读完所有内容后,可以假设rest 是一个完整的段落,然后使用_flush 来推送它,如下所示。抛出上述异常,可能是因为"&lt;p&gt;"+this.rest+"&lt;/p&gt;" 大于this.rest。这不是真正的预期行为......

     _flush(done) {
        if (this.rest !== "") this.push("<p>"+this.rest+"</p>");
        this.rest = null;
        this.toPush = null;
        done()
    
     }
    

    编辑: 所以 Calvin Metcalf 在这里给了我一个工作https://github.com/nodejs/readable-stream/issues/207:在节点 8.0.0 上,可以使用 _final 而不是 _flush。 这个问题似乎很不稳定,因为他没有在他的环境中繁殖。

    【讨论】:

      猜你喜欢
      • 2014-03-23
      • 1970-01-01
      • 1970-01-01
      • 2020-07-15
      • 1970-01-01
      • 2020-10-01
      • 1970-01-01
      • 1970-01-01
      • 2016-12-22
      相关资源
      最近更新 更多