【问题标题】:How to make concurrent Node.js stream processing while preserving order?如何在保持顺序的同时进行并发 Node.js 流处理?
【发布时间】:2017-12-28 04:50:33
【问题描述】:

我有一个使用流的复杂数据处理管道,其中我有一个可读流input、一个可写流output,以及一系列转换流(我们称它们为step1step2、@ 987654327@ 和 step4)。而step1step3output 是无状态的,仅依赖传入的数据块来产生它们的输出,块对块、step2step4 是聚合步骤,从多个块收集数据到产生它们的输出,并且通常具有在时间上重叠的输出(例如,chunk1、chunk3 和 chunk5 可能会产生 output1,chunk2 和 chunk4 可能会产生 output2,等等)。

目前,管道结构如下:

input.pipe(step1).pipe(step2).pipe(step3).pipe(step4).pipe(output);

这个管道在计算上非常昂贵,因此我想将它拆分到多个实例中,最好在多个内核上运行。 Node.js 流保证订单保存,因此 Node.js 似乎平衡了消息传递,以便首先从一个步骤出来的数据块首先传递到下一步,这是我在任何方法上都需要具备的属性我想出让这个计算并发的方法。

我绝对不是要求牵手,更如果有人以前解决过这个问题,以及用于这种事情的一般方法。我不确定从哪里开始。

【问题讨论】:

  • 不确定是什么问题?在.pipe(step1) 进程完成之前,不应到达.pipe(step2)
  • 您正在尝试解决的当前存在的实际问题是什么?
  • @guest271314 我正在尝试采用 Node.js 流式数据处理应用程序,并将其拆分,以便不同阶段可以同时在不同块上运行,最好跨多个处理器内核以提高性能
  • 没有尝试过node.js implementation of streams,虽然已经尝试过基于specification实现的浏览器上可用的本机流。您目前如何在不同的处理器内核上执行不同的操作,并在程序完成时收到通知? Question 中的代码有什么问题?
  • @guest271314 不幸的是,您和我似乎在谈论两种截然不同的流

标签: javascript node.js concurrency stream


【解决方案1】:

虽然我无法完成订单保存,但我支持的流式传输框架 scramjet 将让您非常接近实现您的目标。

我会在这里推动您找到最佳解决方案:

let seq = 0;
source.pipe(new DataStream())
    .map(data => {data, itr: seq++})        // mark your order
    .separate(x => x % 8)                   // separate into 8 streams
    .cluster((stream) => {                  // spawn subprocesses
         // do your multi threaded transforms here
    }, {threads: 8})
    .mux((a, b) => a.itr - b.itr)           // merge in the order above

在某些时候我会介绍重新排序,但为了保持抽象,我不能走太多捷径,但你可以像上面例子中的 2^52 限制那样使用你的捷径(seq 会用完位然后增加空间)。

这应该会引导您找到一些解决方案。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-02-26
    • 2018-01-12
    • 2015-05-26
    • 1970-01-01
    • 2021-11-22
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多