【问题标题】:How to force publishReplay() to resubscribe?如何强制 publishReplay() 重新订阅?
【发布时间】:2017-02-21 14:29:37
【问题描述】:

我对 RxJS 很陌生,这是我的故事。我想要一个可观察的“会话”,因此新订阅者将始终获得当前会话,然后是所有新会话(如果它们可以出现)。所以我写了这样的东西:

var session = Rx.Observable.from([0,1,3])
  .do(x => console.log("Useful job"))
  .publishReplay(1)
  .refCount();

var subscr1 = session.subscribe( x => {
    console.log("sub1 = " +x)
    //subscr1.unsubscribe();
})

console.log("Completed");
subscr1.unsubscribe();

session.subscribe( x => {
  console.log("sub2 = " +x)
});

输出是:

Useful job
sub1 = 0
Useful job
sub1 = 1
Useful job
sub1 = 3
Completed
sub2 = 3

为什么 sub2 订阅时没有有用的工作?我预计它会完全冷!

【问题讨论】:

    标签: rxjs reactive-programming rxjs5


    【解决方案1】:

    .publishReplay() 的第一个参数是它将重播的项目数,因此如果您知道始终只能期望 3 个,则可以使用 .publishReplay(3)

    如果您想重播整个序列,您可以先使用 toArray() 收集其所有项目,将数组存储在 ReplaySubject 中,然后将数组展平为单个值。

    var session = Rx.Observable.from([0,1,3])
      .toArray()
      .publishReplay(1)
      .refCount()
      .concatAll();
    

    请注意,toArray() 在其源代码完成之前不会发出任何内容。

    编辑:我知道现在有什么问题了。

    您的方法是正确的,但问题出在Observable.frompublishReplay() 中使用的主题。源 Observable 发出三个项目并发送complete 通知。当主题收到completeerror 通知时,它会将自己标记为stopped,并且不会再重新发送任何项目。这正是您的示例中发生的情况。

    如果您手动发出值而不发送complete,它将按预期工作。

    // var session = Rx.Observable.from([0,1,3])
    var session = Rx.Observable.create(subscriber => {
        subscriber.next(0);
        subscriber.next(1);
        subscriber.next(3);
      })
      .do(x => console.log("Useful job"))
      .publishReplay(1)
      .refCount();
    

    这会打印到控制台:

    Useful job
    sub1 = 0
    Useful job
    sub1 = 1
    Useful job
    sub1 = 3
    Completed
    sub2 = 3
    Useful job
    sub2 = 0
    Useful job
    sub2 = 1
    Useful job
    sub2 = 3
    

    看到同样的问题:Rx.Subject loses events

    【讨论】:

    • 抱歉,这只是一个示例。原来的“会话”实际上相当复杂。它连接到 EventSource,做很多事情。然后发出“会话”对象。在某些时候,它可以重新连接并发出“新会话”对象。通常它仅在“注销事件”时结束。现在在下一个“登录”事件中,我希望它在第二次重新连接时重复可观察到的会话。而且不是这样的。
    • @norekhov 你能做一个模拟这个问题的演示吗?我可能不明白事件的顺序是什么,而 session 是什么。
    • 想象一下 Rx.Observable.from([0,1,3]) 是一个很少发出新会话的可观察对象。在某个时刻,会话“3”变为非活动状态。第二个 observable 收到“3”,而我希望重新启动 session observable。
    • 应该像 shareReplay 这样的东西。
    • 恐怕我没跟上。会话本身就是 Observable?所以你有 Observable 发射 Observables 而不是 Rx.Observable.from([0,1,3])
    【解决方案2】:

    我终于找到了解决办法。

    将 .publishReplay(1) 替换为 .multicast(() => new Rx.ReplaySubject(1))。所以基本上我用subjectFactory替换了一个主题。

    然后它按预期工作。所以当所有人都退订时,它变得绝对冷。

    【讨论】:

    • 请注意,现在每个订阅者都有自己的 ReplaySubject 实例(ReplaySubject 不像 .publishReplay(1) 那样在所有订阅者之间共享。
    • @martin 不,他们没有。每个新订阅者都会收到最后一个传递的值,这意味着 ReplaySubject 是共享的。
    猜你喜欢
    • 2017-12-03
    • 1970-01-01
    • 2020-04-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-06-21
    • 2022-08-19
    • 2021-08-25
    相关资源
    最近更新 更多