【问题标题】:Create a blocking behavior between subscriptions在订阅之间创建阻塞行为
【发布时间】:2022-01-29 00:49:29
【问题描述】:

我已经使用 defer 从一个 Promise 创建了一个 observable。

let connect$ = defer(() => connectAsync());

这将在我每次订阅connect$ 时执行connectAsync 函数。 但我需要它等到之前的订阅完成,直到新的订阅开始。

这是一些带有 cmets 的代码,可以使我想要更清楚的行为。

connect$.subscribe(); // --|>
connect$.subscribe(); //    --|>
connect$.subscribe(); //       --|>

是否有任何 rxjs 运算符或主题可以用来实现行为?

【问题讨论】:

    标签: promise rxjs


    【解决方案1】:

    简短的回答:

    不,RxJS 库中没有这样的运算符

    长答案:

    关于 RxJS 柯里化操作符方式的一个很酷的事情是,创建自己的操作符相当轻松,并且 RxJS 会像处理任何其他操作符一样对待它。

    例如:

    function shareQueue<T>(): MonoTypeOperatorFunction<T>{
      let buffer: Observable<T> = EMPTY;
      return (s: Observable<T>) => defer(() => {
        buffer = concat(
          buffer.pipe(ignoreElements()),
          s
        ).pipe(
          share({
            resetOnError: () => EMPTY;
            resetOnComplete: () => EMPTY;
            resetOnRefCountZero: () => EMPTY;
          })
        );
        return buffer;
      });
    }
    

    然后您可以像这样使用它来获得(我认为的)您所追求的:

    const connect$ = defer(connectAsync).pipe(
      shareQueue()
    );
    
    connect$.subscribe();
    connect$.subscribe();
    connect$.subscribe();
    

    【讨论】:

    • 这是解决这个问题的好方法。刚刚测试过它,就像一个魅力。很好地使用了缓冲区 observable 和 ignoreElements。在测试时,我看到了 share 运算符在这个 curried 运算符中的重要性。你能解释一下它在这里的作用吗?
    • @blueshell 当然,每个订阅者都必须等待缓冲区清除。 Observable 默认情况下不共享/热/多播,因为我们可能需要任意数量的订阅者来使缓冲区多播。否则,缓冲区将为每个订阅者多次重新订阅源。
    • 啊,是的,现在我明白了。它需要共享,因为在共享缓冲区过期之前,connectAsync observable 不会被执行。否则,connectAsync observable 的每个订阅都将被添加到缓冲区中,并且它将执行与订阅者一样多的次数。非常感谢!
    • @blueshell 是的,另一种思考方式是我们有效地创建了一个链表样式缓冲区。 share 确保我们有一个指向前一个节点的链接(而不是指向 每个 前一个节点的链接)。由于节点自己管理(完成后取消订阅他们的prev),这使得整个事情看起来很简单:)
    • 那行不通。就像share 操作符shareReplay 在抛出错误时也会重新订阅源。我读过另一篇关于此的so post。我确实认为该解决方案非常难看,但现在可以了。稍后我可以将更漂亮的解决方案与ShareConfig 对象一起使用。
    猜你喜欢
    • 1970-01-01
    • 2020-09-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多