【问题标题】:How implement back pressure manually如何手动实现背压
【发布时间】:2019-11-22 05:03:52
【问题描述】:

我有子进程,因为我正在将流传输到父进程。

在 child.js 中

  let stream = readdirp(pathname);
  stream.pipe.pipe(process.stdout);   

在 parent.js 中

let file = child => {
  let estream = es.map((data, next) => {
    _this.s3MultiUpload(JSON.parse(data), data, next);
    //i uploding this files to s3.
  });
  child.on("end", (code, signal) => {
    console.log("stream ended"); // `here is my problem`
    child.kill();
  });
  child.on("exit", (code, signal) => {
    console.log(code);
    console.log(signal);
    child.kill();
  });
  return estream;
};
child = fork(filePath, { silent: true });
child.stdout.pipe(this.file(child));

我的问题是在我将所有文件上传到 s3 流之前结束。我研究了背压,但我不明白如何在这里实现?

我想我需要添加回调或其他东西来处理标准输出管道。我不知道

你能帮帮我吗

【问题讨论】:

  • 你通过分叉让事情变得比你需要的更复杂。 IO 不受 CPU 限制,因此使用Promise.all(await util.promisify(fs.readDir)(pathname)).map(file => ...) 会更好。另外,只是为了争论twitter.com/headinthebox/status/774635475071934464
  • @AluanHaddad 我看不懂,能详细点吗??
  • 我是说你不需要使用子进程、ad-hoc observable 类似的机制,甚至是回调。只需使用 JavaScript 语言以及 NodeJS 内置模块 utilfs
  • @AluanHaddad 起初我只在主进程中尝试过,如果目录中有大文件意味着我的线程挂起,所以将它移到另一个线程。我正在使用 readdirp 读取嵌套目录,并添加过滤器,以及更多选项
  • 您是在处理文件还是只是上传文件? Promise 防止阻塞,即使是单线程你的工作负载是 IO 绑定而不是 CPU 绑定

标签: node.js stream es6-promise child-process asynchronous-javascript


【解决方案1】:

这种方法不必要地复杂。由于 IO 操作不受 CPU 限制,我们最好将 Promises 与 JavaScript 的 async/await* 语法一起使用来执行并行文件上传。构建我们自己的同步机制很复杂,并且会出现许多重叠的语言和库级别的概念1

基于readdirp documentation,但注意到我对特定的上传 API 不熟悉,我会建议这些方面的内容

const readdirp = require('readdirp');
const util = require('util');
const fs = require('fs');

const readfile = util.promisify(fs.readfile);

(async function () {
  // Use streams to achieve small RAM & CPU footprint.
  // 1) Streams example with for-await. Node.js 10+ only.
  const paths = [];
  for await (const {path} of readdirp('pending-uploads')) {
    paths.push(path);
  }

  const uploadPromises = paths
    .map(readFile)
    .map(JSON.parse).
    .map(data => s3MultiUpload(data));

  await Promise.all(uploadPromises);
}());

1。背压是在将 Reactive Extensions 库移植到用 Java 实现它的 JVM 的过程中产生的压力之一。只是为了争论(理智?)考虑what Erik Meijer says regarding backpressure

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-06-22
    • 1970-01-01
    • 2014-01-27
    • 2019-12-08
    • 1970-01-01
    • 1970-01-01
    • 2017-05-29
    • 2012-04-22
    相关资源
    最近更新 更多