【问题标题】:RxJS: How to merge observables dynamicallyRxJS:如何动态合并可观察对象
【发布时间】:2020-01-26 20:41:16
【问题描述】:

我想将 2 个或多个 observable 动态组合成一个组合 observable。

我了解如何将两个已经存在的 observable 与 merge 结合起来,但是当需要动态添加额外的 observable 时(例如超时后),我该如何解决“合并”问题?

此外,combinedStream$ 上的现有订阅在“即时”合并另一个 observable 时不应丢失。

这是我目前所拥有的:

const action1$ = interval(1000).pipe(map(data => 'Action1 value:' + data));
const action2$ = interval(1000).pipe(map(data => 'Action2 value:' + data));

const combinedStream$ = merge(action1$, action2$);
combinedStream$.subscribe(data => console.log('Combined Stream Output:', data));

// Add another observable after some time...
setTimeout(() => {
  const action3$ = interval(1000).pipe(map(data => 'Action3 value:' + data));
  // How add this action3$ to the combined stream ?
}, 1000);

这是我的堆栈闪电战:https://stackblitz.com/edit/rxjs-s2cyzj

【问题讨论】:

标签: rxjs


【解决方案1】:

在处理该用例时,最简单的做法是使用 observable... 的 observable,然后使用更高阶的函数,例如 concatAllswitchmergeAll...

const action1$: Observable<string> = interval(2000).pipe(
  map(data => "Action1 value:" + data)
);
const action2$: Observable<string> = interval(2000).pipe(
  map(data => "Action2 value:" + data)
);
const action3$: Observable<string> = interval(2000).pipe(
  map(data => "Action3 value:" + data)
);

const mainStream$$: Subject<Observable<string>> = new Subject();

const combinedStream$ = mainStream$$.pipe(mergeAll());

combinedStream$.subscribe(data => console.log("Combined Stream Output:", data));

mainStream$$.next(action1$);
mainStream$$.next(action2$);
// Add another stream after some time...
setTimeout(() => {
  mainStream$$.next(action3$);
}, 1000);

演示:https://stackblitz.com/edit/rxjs-stwvtc?file=index.ts

【讨论】:

  • 如何动态删除 Observable?
  • 和经典的 observable 一样吗? takeUntil, takeWhile, first, take, ... 使用任何可以关闭一个流并将其应用于您要关闭的流的运算符?
猜你喜欢
  • 2016-06-18
  • 1970-01-01
  • 2019-10-18
  • 1970-01-01
  • 2018-02-04
  • 2019-02-15
  • 2020-09-22
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多