【问题标题】:Concatenate two (or n) streams连接两个(或 n 个)流
【发布时间】:2013-05-02 02:55:39
【问题描述】:
  • 2 流:

    鉴于可读的streamsstream1stream2获取包含stream1stream2 连接的流的惯用(简洁)方法是什么?

    我不能stream1.pipe(outStream); stream2.pipe(outStream),因为这样流的内容就混在一起了。

  • n 个流:

    给定一个EventEmitter,它发出不确定数量的流,例如

    eventEmitter.emit('stream', stream1)
    eventEmitter.emit('stream', stream2)
    eventEmitter.emit('stream', stream3)
    ...
    eventEmitter.emit('end')
    

    获取所有流连接在一起的流的惯用(简洁)方式是什么?

【问题讨论】:

    标签: node.js stream eventemitter


    【解决方案1】:

    您也许可以使其更简洁,但这是一个可行的方法:

    var util = require('util');
    var EventEmitter = require('events').EventEmitter;
    
    function ConcatStream(streamStream) {
      EventEmitter.call(this);
      var isStreaming = false,
        streamsEnded = false,
        that = this;
    
      var streams = [];
      streamStream.on('stream', function(stream){
        stream.pause();
        streams.push(stream);
        ensureState();
      });
    
      streamStream.on('end', function() {
        streamsEnded = true;
        ensureState();
      });
    
      var ensureState = function() {
        if(isStreaming) return;
        if(streams.length == 0) {
          if(streamsEnded)
            that.emit('end');
          return;
        }
        isStreaming = true;
        streams[0].on('data', onData);
        streams[0].on('end', onEnd);
        streams[0].resume();
      };
    
      var onData = function(data) {
        that.emit('data', data);
      };
    
      var onEnd = function() {
        isStreaming = false;
        streams[0].removeAllListeners('data');
        streams[0].removeAllListeners('end');
        streams.shift();
        ensureState();
      };
    }
    
    util.inherits(ConcatStream, EventEmitter);
    

    我们使用streams(流队列;push 位于后面,shift 位于前面)、isStreamingstreamsEnded 来跟踪状态。当我们得到一个新的流时,我们推送它,当一个流结束时,我们停止监听并转移它。当流结束时,我们设置streamsEnded

    在每个事件中,我们检查我们所处的状态。如果我们已经在流式传输(管道传输),我们什么也不做。如果队列为空并且设置了streamsEnded,我们将发出end 事件。如果队列中有东西,我们会恢复它并监听它的事件。

    *请注意,pauseresume 是建议性的,因此某些流可能无法正确运行,并且需要缓冲。这个练习留给读者。

    完成所有这些后,我将通过构造一个EventEmitter 来处理n=2 的情况,用它创建一个ConcatStream,并发出两个stream 事件,然后是一个end 事件。我敢肯定它可以做得更简洁,但我们不妨使用我们所拥有的。

    【讨论】:

    • 谢谢亚伦!我有点希望有一些现有的库,所以我可以用三行来解决它。如果没有,我想我可能会将您的解决方案提取到一个包中。我可以在 MIT 许可下使用你的代码吗?
    • 啊,找到stream-stream库了。看我的回答。
    • @JoLiss 我也先找了一些东西,但我没有找到那个选项。如果您仍然愿意,当然可以在库中使用我的代码。
    【解决方案2】:

    combined-stream 包连接流。自述文件中的示例:

    var CombinedStream = require('combined-stream');
    var fs = require('fs');
    
    var combinedStream = CombinedStream.create();
    combinedStream.append(fs.createReadStream('file1.txt'));
    combinedStream.append(fs.createReadStream('file2.txt'));
    
    combinedStream.pipe(fs.createWriteStream('combined.txt'));
    

    我相信您必须一次附加所有流。如果队列为空,combinedStream 将自动结束。见issue #5

    stream-stream 库是具有显式 .end 的替代方案,但它不太受欢迎,而且可能没有经过充分测试。它使用 Node 0.10 的 streams2 API(参见this discussion)。

    【讨论】:

    • combined-stream 包已经支持在回调函数中添加源流,因此您不必在开始时启动它们,这有助于节省内存、文件描述符等。此外,还有很多更受欢迎的库 multistream 似乎经过更多测试
    【解决方案3】:

    streamee.js 是一组基于 node 1.0+ 流的流转换器和作曲家,包括一个连接方法:

    var stream1ThenStream2 = streamee.concatenate([stream1, stream2]);
    

    【讨论】:

    • 谢谢,我去看看。我假设那是 Node 0.10?
    • 是 Node 0.10,但您可以将旧式流包装成 README 中所写的 0.10+ 流
    【解决方案4】:

    https://github.com/joepie91/node-combined-stream2 是组合流模块(如上文所述)的插入式 Streams2 兼容替代品。它自动包装 Streams1 流。

    combined-stream2 的示例代码:

    var CombinedStream = require('combined-stream2');
    var fs = require('fs');
    
    var combinedStream = CombinedStream.create();
    combinedStream.append(fs.createReadStream('file1.txt'));
    combinedStream.append(fs.createReadStream('file2.txt'));
    
    combinedStream.pipe(fs.createWriteStream('combined.txt'));
    

    【讨论】:

      【解决方案5】:

      这可以用 vanilla nodejs 完成

      import { PassThrough } from 'stream'
      const merge = (...streams) => {
          let pass = new PassThrough()
          let waiting = streams.length
          for (let stream of streams) {
              pass = stream.pipe(pass, {end: false})
              stream.once('end', () => --waiting === 0 && pass.emit('end'))
          }
          return pass
      }
      

      【讨论】:

      • 如果一个流永远不会结束,而另一个流会怎样
      • 只是更新pass.emit('end') 不起作用。试试pass.end()
      • --waiting 更改为 waiting--
      • @TomLarkworthy 这是对@PirateApp 的回应吗?如果不是,那么我不明白为什么要这样做,因为这会导致最后一个流永远不会结束
      • 这个解决方案很好,但由于某些奇怪的原因,我的使用中没有保持流的顺序。调用merge(a, b) 产生了一个流,其中ba 之前。是否与ba 相比,它的项目流少得多,并且先结束这一事实有关?
      【解决方案6】:

      如果您不关心流中数据的顺序,在nodejs 中进行简单的reduce 操作应该没问题!

      const {PassThrough} = require('stream')
      
      let joined = [s0, s1, s2, ...sN].reduce((pt, s, i, a) => {
        s.pipe(pt, {end: false})
        s.once('end', () => a.every(s => s.ended) && pt.emit('end'))
        return pt
      }, new PassThrough())
      

      干杯 ;)

      【讨论】:

      • 你不应该从reduce返回一些东西吗?这看起来像 joined 将是未定义的。
      • 警告:这将导致所有流并行传输到 PassThrough 流,而不考虑数据的顺序,很可能会损坏您的数据。
      • @LeonLi 这确实是这种方法的目的。如果您想保留顺序,您可以将不同于 PassThrough 的初始值传递给您的 reduce 函数;)
      • @Ivo 这个问题询问的是 concatenation。因此,大多数到达此 QA 的读者都会关心订购。这个答案默默地误导了那些读者,因为流成功地通过了,但除非你检查输出,否则你永远不会知道它也会混淆你的所有数据(这个问题首先特别要求避免!)。我敦促您将此信息添加到答案正文中。
      • 没有stream.ended 这样的东西。您必须在结束事件处理程序中设置 s.ended = true
      【解决方案7】:

      在 vanilla nodejs 中使用 ECMA 15+ 并结合 IvoFeng​​strong> 的好答案。

      PassThrough 类是一个普通的Transform 流,它不会以任何方式修改流。

      const { PassThrough } = require('stream');
      
      const concatStreams = (streamArray, streamCounter = streamArray.length) => streamArray
        .reduce((mergedStream, stream) => {
          // pipe each stream of the array into the merged stream
          // prevent the automated 'end' event from firing
          mergedStream = stream.pipe(mergedStream, { end: false });
          // rewrite the 'end' event handler
          // Every time one of the stream ends, the counter is decremented.
          // Once the counter reaches 0, the mergedstream can emit its 'end' event.
          stream.once('end', () => --streamCounter === 0 && mergedStream.emit('end'));
          return mergedStream;
        }, new PassThrough());
      

      可以这样使用:

      const mergedStreams = concatStreams([stream1, stream2, stream3]);
      

      【讨论】:

      • 这会在流完成之前对流进行管道化,将它们混杂在一起;这正是最初的问题要避免的问题 - 如何连接而不是混乱流。
      • 为了避免这种情况,你应该在前一个触发'end'事件之后stream.pipe下一个
      【解决方案8】:

      下面的代码对我有用:)。已从之前给出的所有答案中获取输入

        const pipeStreams = (streams) => {
        const out = new PassThrough()
        // Piping the first stream to the out stream
        // Also prevent the automated 'end' event of out stream from firing
        streams[0].pipe(out, { end: false })
        for (let i = 0; i < streams.length - 2; i++) {
          // On the end of each stream (until the second last) pipe the next stream to the out stream
          // Prevent the automated 'end' event of out stream from firing
          streams[i].on('end', () => {
            streams[i + 1].pipe(out, { end: false })
          })
        }
        // On the end of second last stream pipe the last stream to the out stream.
        // Don't prevent the 'end flag from firing'
        streams[streams.length - 2].on('end', () => {
          streams[streams.length - 1].pipe(out)
        })
        return out
      } 
      

      【讨论】:

        【解决方案9】:

        这里两个最受好评的答案都不适用于异步流,因为它们只是通过管道传输内容,而不管源流是否已准备好生成。我必须将内存中的字符串流与来自数据库的数据馈送相结合,并且数据库内容始终位于结果流的末尾,因为它需要一秒钟才能获得数据库响应。这就是我最终为我的目的而写的内容。

        export function joinedStream(...streams: Readable[]): Readable {
          function pipeNext(): void {
            const nextStream = streams.shift();
            if (nextStream) {
              nextStream.pipe(out, { end: false });
              nextStream.on('end', function() {
                pipeNext();
              });
            } else {
              out.end();
            }
          }
          const out = new PassThrough();
          pipeNext();
          return out;
        }
        

        【讨论】:

          【解决方案10】:

          现在可以使用异步迭代器轻松完成此操作

          async function* concatStreams(readables) {
            for (const readable of readables) {
              for await (const chunk of readable) { yield chunk }
            }
          } 
          

          你可以这样使用它

          const fs = require('fs')
          const stream = require('stream')
          
          const files = ['file1.txt', 'file2.txt', 'file3.txt'] 
          const iterable = await concatStreams(files.map(f => fs.createReadStream(f)))
          
          // convert the async iterable to a readable stream
          const mergedStream = stream.Readable.from(iterable)
          

          有关异步迭代器的更多信息:https://2ality.com/2019/11/nodejs-streams-async-iteration.html

          【讨论】:

          • createReadStream 返回的流不可迭代。
          • 你的意思是mergedStream吗?因为我可以毫无问题地迭代它gist.github.com/ducaale/5e3fd00a70487c98333e5fb42bc4b624
          • 如果不需要订单,ss可以等待所有的,以便并行运行,从而更快?
          • 当然,您可以通过await readable.next() 手动从两个异步迭代器中获取下一个项目,然后获取第一个解析的项目。
          • 这个选项在我看来是最好的,但打字稿实现必须在签名上添加...,像这样ts async function* concatStreams(...readables) {
          猜你喜欢
          • 2010-11-11
          • 2021-12-08
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2015-04-06
          • 1970-01-01
          • 2015-09-07
          • 1970-01-01
          相关资源
          最近更新 更多