【问题标题】:How to get a unified onFinish from separate streams (some created from within original stream)如何从单独的流中获得统一的 onFinish(一些从原始流中创建)
【发布时间】:2015-10-28 12:23:28
【问题描述】:

我有一个这样的流式处理:

Incomming file via HTTP (original stream)
  -> Check if zipfile
    - Yes -> push through an unzip2-stream
    - No -> push to S3

当 unzip2-stream 找到 zip-entries 时,它们会通过相同的流链推送,即

Incomming file entry from zip file ("child" stream)
  -> Check if zipfile
    - Yes -> push through an unzip2-stream
    - No -> push to S3

感谢https://stackoverflow.com/users/3580261/eljefedelrodeodeljefe 在这次谈话之后我设法解决了主要问题: How to redirect a stream to other stream depending on data in first chunk?

为每个 zip 条目创建新的“子”流的问题在于,这些流与原始流没有连接,因此我无法为所有流获得统一的 onFinish。

我不想在处理(解压缩并发送到 S3)每个文件之前向发件人发送 202。我怎样才能做到这一点?

我在想我可能需要某种控制对象来等待所有子流的 onFinish 并强制进程停留在原始的 onFinish 事件中,直到所有文件都被处理。这会不会有点矫枉过正?有没有更简单的解决方案?

【问题讨论】:

    标签: node.js stream unzip


    【解决方案1】:

    我最终为流制作了一个单独的计数器。可能有更好的解决方案,但这可行。

    我将计数器对象作为参数发送给我的 saveFile() 函数的第一次调用。计数器被传递给解压缩流,因此它可以传递给每个文件条目的 saveFile。

    • 就在流开始(即管道)之前,我调用了 streamCounter.streamStarted()。
    • 在管道链的最后一个 onFinish 中,我调用了 streamCounter.streamFinished()
    • 如果流变坏,我会调用 streamCounter.streamFailed()

    就在我以 post route 形式发送 202 之前,我等待 streamCounter.streamPromise 解决。

    我对 setInterval 解决方案并不十分自豪。发射某种事件可能会更好。

    module.exports.streamCounter = function() {
      let streamCount = 0;
      let isStarted = false;
      let errors = [];
    
      this.streamStarted = function(options) {
        isStarted = true;
    
        streamCount += 1;
        log.debug(`Stream started for ${options.filename}. New streamCount: ${streamCount}`);
      };
    
      this.streamFinished = function(options) {
        streamCount -= 1;
        log.debug(`Finished stream for ${options.filename}. New streamCount: ${streamCount}`);
      };
    
      this.streamFailed = function(err) {
        streamCount -= 1;
        errors.push(err);
        log.debug(`Failed stream because (${err.message}). New streamCount: ${streamCount}`);
      };
    
      this.streamPromise = new Promise(function(resolve, reject) {
        let interval = setInterval(function() {
          if(isStarted && streamCount === 0) {
            clearInterval(interval);
    
            if(errors.length === 0) {
              log.debug('StreamCounter back on 0. Resolving streamPromise');
              resolve();
            } else {
              log.debug('StreamCounter back on 0. Errors encountered.. Rejecting streamPromise');
              reject(errors[errors.length-1]);
            }
          }
        }, 100);
      });
    };
    

    起初我用一个 promise 数组尝试了这个概念,并在发送状态 202 之前等待 Promise.all()。但据我所知,Promise.all() 仅适用于静态数组。我的“streamCount”在流式传输期间发生变化,因此我需要一个更加动态的“Promise.all”。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-04-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多