【问题标题】:How to stream to a Promise?如何流式传输到 Promise?
【发布时间】:2021-07-16 22:05:53
【问题描述】:

当我将POST 数据写入我的应用程序时,我会将其写入文件并通过PUT 与其他实例共享。我想从POST 返回每​​个流的状态(文件和 PUT)。

putResults 是一个数组,它是封闭类的一部分,用于保存每个请求的结果。

如何收集回复?我可以从createWriteStreams 返回一组 Promise,但是我怎么能将req.pipe 给他们呢?你可以流式传输到 Promise 吗?

  post(req, res, next) {
    let listeners = this.getWriteStreams();
    let c = listeners.length;
    for (i = 0; i < c; i++) {
      req.pipe(listeners[i]);
    }
    
    /* What do I put here to return after all requests finish? */

  }

  put(req, res, next) {
    var fstream = fs.createWriteStream(this.path);
    req.pipe(fstream);
    req.on('end', () => {
      fstream.close();
      res.status(201).send("OK");
    });
    req.on('error', (err) => {
      res.status(500).send(err);
    });
  }

  createWriteStreams() {
    let listeners = [];
    // We always want to save to a file
    listeners.push(fs.createWriteStream(this.path).on('close', ()=>{
      this.putResults.push({ host: hutil.myHost, status: 201 });
    })); 
    // If there are other servers in current env, send to them, too!
    let otherGuys = hostutil.otherServers();
    if (otherGuys.length > 0) {
      for (i = 0; i < otherGuys.length; i++) {
        let opts = {
          hostname: hutil.fq(otherGuys[i]),
          port: this.port, 
          path: this.endpoint,
          method: 'PUT',
        };
        let req = https.request(opts, res => {
          this.putResults.push({ host: opts.hostname, status: res.statusCode});
        });
        req.on('error', (e) => {
          this.putResults.push({ host: opts.hostname, status: e });
        });

        listeners.push(req);
      }
    }
    return listeners;
  }

【问题讨论】:

  • 抱歉,您能否添加一张图表来解释它的工作原理以及您想要完成的任务?
  • 我不反对图表,但它确实有点太简单了。我在POST 请求中获取数据,并将其流式传输到文件系统和可变数量的PUT 请求。问题是我希望原始的POST 返回一个response,它聚合了所有streamsresponses,并且只有在它们完成后才这样做。
  • 当所有四个请求都完成(成功或错误)时,基本上是时候res.end()了。你需要一个计数器,或者类似Promise.all/.allSettled
  • 让我困惑的是流。我需要从输入请求流式传输到文件 + 3 个输出请求。然后等待关闭文件和对 3 个请求的 3 个响应。太混乱了——我把Promise.all放在哪里以及如何通过它流式传输?我可以把它放在req.pipe(listeners[i]) 循环之后吗?

标签: node.js express promise stream


【解决方案1】:

好吧,万一有人有这个问题,知识的关键点是输入流可以传递,就好像它上面有多个自定义插口一样 - 打开一个似乎不会过早地将数据喷洒到整个其他放置软管的地方!

因此,由于您无法流式传输到 Promise,因此您仍然可以流式传输到流,并且您显然可以花时间设置它们。这是我的解决方案:将请求传递给封装在 Promise 中的流。

function post(req, res, next) {
  let promises = this.streamPromises(req);

  Promise.allSettled(promises).then((results) => {
    // Remove the Promise container junk - results come in 2 completely different object structures. Great design, jeez. :-\
    let noContainer = results.map(item => item.value).filter(i => i != undefined);
    noContainer.push(...results.map(item => item.reason).filter(i => i != undefined));

    res.status(200).json(noContainer);
  }).catch(err => {
    log.warn(`POST request for ${this.filename} failed, at least in part: ${err}`)
    res.status(200).json({ host: hutil.myHost, status: err });
  });
}

function put(req, res, next) {
  var fstream = fs.createWriteStream(this.fqFile);
  req.pipe(fstream);
  req.on('end', () => {
    fstream.close();
    log.info(`${req.transID} Saved data to ${this.fqFile} sent by ${req.referer}`);
    res.status(201).send("OK");
  });
  req.on('error', (err) => {
    log.warn(`${req.transID} Error receiving/saving PUT file ${this.fqFile} sent by ${req.referer}`);
    res.status(500).send(err);
  });
}

function streamPromises(postReq) {
  let listeners = [];

  listeners.push(this.streamLocalFrom(postReq));  // add file first

  // Send to other servers in the environment
  let otherGuys = hosts.peerServers();
  if (otherGuys.length > 0) {
    for (i = 0; i < otherGuys.length; i++) {
      let opts = {
        hostname: hosts.fq(otherGuys[i]),
        thatHost: otherGuys[i], // ducked this into the object to avoid parsing fq hostname
        port: appconfig.port, // we are all listening here
        path: this.endpoint,
        method: 'PUT',
        timeout: 1000,
        ca: fs.readFileSync(appconfig.authorityFile)
      };
      let p = new Promise((resolve, reject) => {
        let req = https.request(opts, res => {
          log.info(`${this.filename}: Response from ${opts.hostname}:${opts.port}: ${res.statusCode}`);
          // let hn = opts.hostname.match(/(.*?)\./)[1] || opts.hostname;
          resolve({ host: opts.thatHost, status: res.statusCode });
        });
        req.on('error', (e) => {
          log.warn(`Error piping ${this.filename} to ${opts.hostname}:${opts.port}: ${e}`);
          reject({ host: opts.thatHost, status: e });
        });
        postReq.pipe(req);
      });
      listeners.push(p);
    }
  }
  return listeners;
}

function streamLocalFrom(postReq) {
  return new Promise((resolve, reject) => {
    let fileError = false;
    let fstream = fs.createWriteStream(this.fqFile);
    fstream.on('close', (err) => {
      if (!fileError) {
        log.info(`Saved data to file at ${this.fqFile}`);
        resolve({ host: me, status: 201 });
      }
    });
    fstream.on('error', (err) => {
      log.warn(`Could not save ${this.fqFile} because ${err}`);
      reject({ host: hutil.myHost, status: 500 });
      fileError = true;
      fstream.close();
    });
    postReq.pipe(fstream);
  });
}

【讨论】:

    猜你喜欢
    • 2011-12-24
    • 2013-01-22
    • 2016-04-23
    • 1970-01-01
    • 2019-01-20
    • 1970-01-01
    • 1970-01-01
    • 2015-02-07
    • 1970-01-01
    相关资源
    最近更新 更多