【问题标题】:Chaining two dependent observables in rxjs在 rxjs 中链接两个依赖的 observables
【发布时间】:2021-09-13 21:31:56
【问题描述】:

我有一个发出一系列消息的 observable,比如 obs1。然后是第二个 observable,obs2,它需要 obs1 发出的最后一条消息中的一些数据,并发出另一系列消息。我想“链接”这两个 observables 以产生一个 observable obs3,它从 obs1 和 obs2 串行发出所有消息。

到目前为止我想出的解决方案是:

obs3 = concat(
  obs1,
  obs1.pipe(
    last(),
    concatMap(lastMessage => obs2(lastMessage)),
);

但是这个有obs1被执行(订阅)2次的缺陷。

有没有更直接的方法来实现这一点?像concatMapWithSelf() 这样的操作符可以这样工作:

obs3 = obs1.pipe(
  concatMapWithSelf(lastMessage => obs2(lastMessage)),
);

谢谢!

【问题讨论】:

    标签: rxjs observable concatenation


    【解决方案1】:

    听起来您可以使用 ConnectableObservable。在 RxJS 7 中,我相信使用 multicast() 会更容易和更好地阅读,但在 RxJS 8 中将被弃用,所以唯一的选择可能是用 connectable() 包装源 Observable,然后手动调用 connect()

    const obs1 = connectable(
      defer(() => {
        console.log('new subscription');
        return of('v1', 'v2', 'v3', 'v4');
      })
    );
    
    const obs2 = msg => of(msg);
    
    const obs3 = merge(
      obs1,
      obs1.pipe(
        last(),
        concatMap(lastMessage => obs2(lastMessage))
      )
    );
    
    obs3.subscribe(console.log);
    
    obs1.connect();
    

    现场演示:https://stackblitz.com/edit/rxjs-2uheg4?devtoolsheight=60

    如果obs1 始终是异步的,那么您可能可以使用share(),但同步源的行为会有所不同,因此使用 ConnectableObservable 应该更安全。

    【讨论】:

    • 很棒的答案。只是想知道为什么需要defer
    • defer 仅用于记录每个新订阅的消息。您不需要在实际应用中使用它。
    猜你喜欢
    • 2021-01-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-06-14
    • 1970-01-01
    • 2016-09-07
    相关资源
    最近更新 更多