【问题标题】:How to fork a stream using highland.js?如何使用 highland.js 分叉流?
【发布时间】:2017-02-06 17:38:34
【问题描述】:

我有一个由BaseData 对象组成的sourceStream

我想将此流分成n-amount 个不同的流,然后根据自己的喜好过滤和转换每个BaseData 对象。

最后,我希望 n 流仅包含特定类型,并且分叉的流的长度可能会有所不同,因为将来可能会删除或添加数据。

我想我可以通过fork 设置它:

import * as _ from 'highland';

interface BaseData {
    id: string;
    data: string;
}

const sourceStream = _([
    {id: 'foo', data: 'poit'},
    {id: 'foo', data: 'fnord'},
    {id: 'bar', data: 'narf'}]);

const partners = [
    'foo',
    'bar',
];

partners.forEach((partner: string) => {
    const partnerStream = sourceStream.fork();

    partnerStream.filter((baseData: BaseData) => {
        return baseData.id === partner;
    });

    partnerStream.each(console.log);
});

我希望现在有两个流,foo-stream 包含两个元素:

{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }

bar-stream 包含一个元素:

{ id: 'bar', data: 'narf' }

但我得到了一个错误:

/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338
        throw new Error(
        ^

Error: Stream already being consumed, you must either fork() or observe()
    at Stream._addConsumer (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338:15)
    at Stream.consume (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1500:10)
    at Stream.each (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1774:18)
    at partners.forEach (/usr/src/marketing-tasks/dist/bin/example.js:17:19)
    at Array.forEach (native)
    at Object.<anonymous> (/usr/src/marketing-tasks/dist/bin/example.js:12:10)
    at Module._compile (module.js:570:32)
    at Object.Module._extensions..js (module.js:579:10)
    at Module.load (module.js:487:32)
    at tryModuleLoad (module.js:446:12)

如何将一个流分叉成多个流?


我也尝试链接调用,但我只得到一个流的结果:

partners.forEach((partner: string) => {
    console.log(partner);
    const partnerStream = sourceStream
        .fork()
        .filter((item: BaseData) => {
            return item.id === partner;
        });

    partnerStream.each((item: BaseData) => {
        console.log(item);
    });
});

仅打印:

foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar

而不是预期的:

foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
{id: 'bar', data: 'narf'}

也可能是我误解了fork 的全部含义。根据its doc entry

Stream.fork() 分叉一个流,允许你添加额外的消费者 具有共同的背压。分叉给多个消费者的流将 仅以最慢的消费者可以最快的速度从其源中提取值 处理它们。

注意:不要依赖分叉之间一致的执行顺序。 此转换仅保证所有分叉将处理值 foo 在任何将处理第二个值栏之前。它不保证 分叉处理 foo 的顺序。

提示:在分叉(或 使用这样做的库)。 由于相同的值将被传递给 每个分叉,在一个分叉中所做的更改将在任何分叉中可见 在它之后执行。 加上不一致的执行顺序,并且 您最终可能会遇到细微的数据损坏错误。如果需要修改 任何值,您都应该复制并修改副本。

弃用警告:目前可以在之后分叉流 使用它(例如,通过转换)。这将不再可能 在下一个主要版本中。如果你要分叉一个流,总是 叫 fork 就可以了。

所以不是“如何分叉流?”我的实际问题可能是:如何将高原溪流即时复制到不同的溪流中?

【问题讨论】:

    标签: javascript typescript stream highland.js


    【解决方案1】:

    partnerStream.filter() 返回一个 new 流。然后,您将再次使用partnerStream.each() 消费partnerStream,而无需调用fork()observe()。因此,要么链接partnerStream.filter().each() 调用,要么将partnerStream.filter() 的返回值分配给一个变量,然后调用.each()

    【讨论】:

    • 我已经更新了我的问题,因为现在我没有看到错误,但我只看到一个流的返回值,而不是两个。
    【解决方案2】:

    必须记住,在创建所有分叉之前不要使用分叉流。就好像一个人消费了一个分叉的流一样,它和它的“父级”都会被消费,从而使任何后续的分叉都从一个空流中分叉出来。

    const partnerStreams: Array<Stream<BaseData>> = [];
    
    partners.forEach((partner: string) => {
        const partnerStream = sourceStream
            .fork()
            .filter((item: BaseData) => {
                return item.id === partner;
             }
        );
    
        partnerStreams.push(partnerStream);
    });
    
    partnerStreams.forEach((stream, index) => {
        console.log(index, stream);
        stream.toArray((foo) => {
            console.log(index, foo);
        });
    });
    

    打印出来:

    0 [ { id: 'foo', data: 'poit' }, { id: 'foo', data: 'fnord' } ]
    1 [ { id: 'bar', data: 'narf' } ]
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-12-22
      • 2015-02-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多