【问题标题】:How to split a stream and recombine the sub-streams final results in RxJs如何在 RxJs 中拆分流并重新组合子流最终结果
【发布时间】:2019-07-11 08:27:26
【问题描述】:

我有一个可以发出两种类型消息的源流。我想将它们分成两个单独的流,一旦原始流完成,重新组合它们最终发出的值(如果不存在,则为 undefined)。

例如

const split1$ = source$.pipe(
     filter(m) => m.kind === 1, 
     mergeMap(m) => someProcessing1());
const split2$ = source$.pipe(
     filter(m) => m.kind === 2, 
     mergeMap(m) => someProcessing2());
forkJoin(split1$, split2$).subscribe(
(output1, output2) => console.log(output1, output2));

问题在于,没有任何东西可以保证 split1$ 和 split2$ 都会发出值。如果发生这种情况,forkJoin 永远不会发出。 每当源流完成时,我可以用什么替换 forkJoin 来发出一个值。

【问题讨论】:

标签: rxjs


【解决方案1】:

关于分流: https://www.learnrxjs.io/operators/transformation/partition.html

关于“完成时发射”,您不能只使用complete 回调吗? .subscribe(() => console.log('Emitted"), null, () => console.log('Completed'));

否则,您可以使用startWith 运算符来确保发出某些内容。

const [evens, odds] = source.pipe(partition(val => val % 2 === 0));
evens = evens.pipe(startWith(undefined)); // This will emit undefined before everything, so forkJoin will surely emit

forkJoin 构造函数中添加startWith

forkJoin(evens.pipe(startWith(undefined)), odds.pipe(startWith(undefined)))
  .subscribe(console.log))

【讨论】:

  • 我不能使用完整的回调,因为我想在所有流完成后使用结果数据 StartWith 是一个有效的选项,但会给我的代码带来噪音,因为我必须过滤如果我在拆分流管道中进行任何登录。
  • 只需在startWith 之前登录或在forkJoin 构造函数中添加startWith,这样您就不会更改初始分区流。编辑了答案。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-06-11
  • 2023-04-03
  • 1970-01-01
相关资源
最近更新 更多