【发布时间】:2017-02-06 17:38:34
【问题描述】:
我有一个由BaseData 对象组成的sourceStream。
我想将此流分成n-amount 个不同的流,然后根据自己的喜好过滤和转换每个BaseData 对象。
最后,我希望 n 流仅包含特定类型,并且分叉的流的长度可能会有所不同,因为将来可能会删除或添加数据。
我想我可以通过fork 设置它:
import * as _ from 'highland';
interface BaseData {
id: string;
data: string;
}
const sourceStream = _([
{id: 'foo', data: 'poit'},
{id: 'foo', data: 'fnord'},
{id: 'bar', data: 'narf'}]);
const partners = [
'foo',
'bar',
];
partners.forEach((partner: string) => {
const partnerStream = sourceStream.fork();
partnerStream.filter((baseData: BaseData) => {
return baseData.id === partner;
});
partnerStream.each(console.log);
});
我希望现在有两个流,foo-stream 包含两个元素:
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
和bar-stream 包含一个元素:
{ id: 'bar', data: 'narf' }
但我得到了一个错误:
/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338
throw new Error(
^
Error: Stream already being consumed, you must either fork() or observe()
at Stream._addConsumer (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338:15)
at Stream.consume (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1500:10)
at Stream.each (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1774:18)
at partners.forEach (/usr/src/marketing-tasks/dist/bin/example.js:17:19)
at Array.forEach (native)
at Object.<anonymous> (/usr/src/marketing-tasks/dist/bin/example.js:12:10)
at Module._compile (module.js:570:32)
at Object.Module._extensions..js (module.js:579:10)
at Module.load (module.js:487:32)
at tryModuleLoad (module.js:446:12)
如何将一个流分叉成多个流?
我也尝试链接调用,但我只得到一个流的结果:
partners.forEach((partner: string) => {
console.log(partner);
const partnerStream = sourceStream
.fork()
.filter((item: BaseData) => {
return item.id === partner;
});
partnerStream.each((item: BaseData) => {
console.log(item);
});
});
仅打印:
foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
而不是预期的:
foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
{id: 'bar', data: 'narf'}
也可能是我误解了fork 的全部含义。根据its doc entry:
Stream.fork() 分叉一个流,允许你添加额外的消费者 具有共同的背压。分叉给多个消费者的流将 仅以最慢的消费者可以最快的速度从其源中提取值 处理它们。
注意:不要依赖分叉之间一致的执行顺序。 此转换仅保证所有分叉将处理值 foo 在任何将处理第二个值栏之前。它不保证 分叉处理 foo 的顺序。
提示:在分叉(或 使用这样做的库)。 由于相同的值将被传递给 每个分叉,在一个分叉中所做的更改将在任何分叉中可见 在它之后执行。 加上不一致的执行顺序,并且 您最终可能会遇到细微的数据损坏错误。如果需要修改 任何值,您都应该复制并修改副本。
弃用警告:目前可以在之后分叉流 使用它(例如,通过转换)。这将不再可能 在下一个主要版本中。如果你要分叉一个流,总是 叫 fork 就可以了。
所以不是“如何分叉流?”我的实际问题可能是:如何将高原溪流即时复制到不同的溪流中?
【问题讨论】:
标签: javascript typescript stream highland.js