【问题标题】:How can I achieve a shareReplay with reconnection?如何通过重新连接实现 shareReplay?
【发布时间】:2016-04-20 19:29:27
【问题描述】:

在下面的代码中,我创建了一个简单的 observable,它产生一个值然后完成。然后我分享那个可观察的重播最后一个项目并订阅 3 次。第一次紧随其后,第二次在产生值之前,第三次在产生值并且 observable 完成之后。

let i = 0;
let obs$ = Rx.Observable.create(obs => {
  console.log('Creating observable');
  i++;
  setTimeout(() => {
     obs.onNext(i);
     obs.onCompleted();
  }, 2000);
}).shareReplay(1);

obs$.subscribe(
  data => console.log(`s1: data = ${data}`),
  () => {},
  () => console.log('finish s1')
);

setTimeout( () => {
  obs$.subscribe(
    data => console.log(`s2: data = ${data}`),
    () => {},
    () => console.log('finish s2')

  );  
}, 1000);

setTimeout( () => {
  obs$.subscribe(
    data => console.log(`s3: data = ${data}`),
    () => {},
    () => console.log('finish s3')

  );  
}, 6000);

你可以execute this on jsbin

这会产生以下大理石图

Actual
s1: -----1$
s2:   \--1$
s3:           \1$

但我会期待

Expected
s1: -----1$
s2:   \--1$
s3:           \----2$

我可以理解为什么有人想要第一个行为,但我的理由是,与此示例不同的是,我返回一个数字,我可能返回一个易受取消订阅行为影响的对象,例如数据库连接.如果上面的弹珠图表示一个数据库连接,在 dispose 方法中我调用db.close(),在第三个订阅上我会遇到一个异常,因为我收到了一个已发布的数据库处理程序作为值。 (因为当第二次订阅完成时 refCount = 0 并且源被释放)。

这个例子还有一个奇怪的地方是,即使它正在解决 第一个值并在之后完成,它订阅源两次(正如您可以通过重复的“创建可观察”看到的那样)

我知道this github issue 谈论过这个,但我缺少的是:

如何实现(在 RxJs4 和 5 中)共享的 observable,如果源 observable 尚未完成,则可以重播最后一个项目,如果已​​完成 (refCount = 0),则重新创建 observable。

在 RxJs5 中,我认为 share 方法解决了我的问题的重新连接部分,但不是共享部分。

在 RxJs4 中我一无所知

如果可能的话,我想使用现有的运算符或主题来解决这个问题。我的直觉告诉我,我必须用这种逻辑创建一个不同的主题,但我还没有完全做到。

【问题讨论】:

  • 当您说该对象容易受到退订行为影响时,我不清楚您的意思是什么,您是说您正在发送一个嵌套在另一个 Observable 中的 Observable
  • 没有必要。我的意思是,如果我创建一个Observable 来连接到数据库,例如使用Observable.create,在返回函数中我会调用db.close(),意思是,当你完成了这个可观察的,关闭数据库连接。如果我共享这个 observable(为了避免每个查询有一个连接),当 refCount 变为 0 时,observable 将处理,调用db.close() 并且下次有人订阅共享的 Observable 时,它​​将返回关闭的数据库处理程序重建一个。

标签: javascript reactive-programming rxjs rxjs5


【解决方案1】:

关于shareReplay的一点:

shareReplay 在返回的 observable 的剩余生命周期内保持相同的底层 ReplaySubject 实例。

一旦ReplaySubject 完成,您就不能再向其中添加任何值,但它仍会重播。所以……

  1. 您第一次订阅 observable 并且超时开始。这会将i0 增加到1
  2. 您第二次订阅 observable,超时时间已经过去。
  3. 超时回调触发并发送onNext(i),然后发送onCompleted()
  4. onCompleted() 信号完成了 shareReplay 内的 ReplaySubject,这意味着从现在开始,共享的 observable 将简单地重放它的值(即 1)并完成。

关于共享 observables 的一些概述:

另一个单独的问题是,由于您共享了 observable,它只会调用一次订阅者函数。这意味着i 只会增加一次。所以即使你没有onCompleted 并杀死你的底层ReplaySubject,你最终也不会将它增加到2

这不是 RxJS 5

一个快速的判断方法是onNext vs next。您当前在您的示例中使用 RxJS 4,但是您已经用 RxJS 5 标记了它,并且您在 RxJS 5 中发现了一个问题。RxJS 5 是测试版,是一个完全重写 RxJS 4 的新版本。 API 更改主要是为了匹配 es-observable proposal which is currently at stage 1

更新示例

I've updated your example to give you your expected results

基本上,您希望在前两个调用中使用 observable 的共享版本,在第三个调用中使用原始 observable。

let i = 0;
let obs$ = Rx.Observable.create(obs => {
  console.log('Creating observable');
  i++;
  setTimeout(() => {
     obs.onNext(i);
     obs.onCompleted();
  }, 2000);
})


let shared$ = obs$.shareReplay(1);

shared$.subscribe(
  data => console.log(`s1: data = ${data}`),
  () => {},
  () => console.log('finish s1')
);

setTimeout( () => {
  shared$.subscribe(
    data => console.log(`s2: data = ${data}`),
    () => {},
    () => console.log('finish s2')

  );  
}, 1000);

setTimeout( () => {
  obs$.subscribe(
    data => console.log(`s3: data = ${data}`),
    () => {},
    () => console.log('finish s3')

  );  
}, 6000);

不相关

另外,小提示:请务必为调用 clearTimeout 的自定义 observable 返回取消语义。

【讨论】:

  • 感谢您的回复!遗憾的是它并没有解决我的问题:(。我知道 rxjs5 和 4 之间的区别,已经关注该项目一段时间了。继续努力!我无法升级到 RxJs 5,直到我提交的这个问题得到修复或者我们切换到使用模块加载器github.com/ReactiveX/rxjs/issues/1481。所以我必须在 v4 中解决这个问题,但也想在 v5 中知道。这个例子是问题的简化。我有一个 angular1 服务提供共享的 observable,所以我不能说何时使用 shared 。是否有一个 lib/op 可以共享和更改底层主题?
  • 在完成课程时更改基础主题。如果没有,我想知道你如何使用 observables 来做类似数据库池的事情。从我之前提到的问题中,我了解您提出的更改是如何具有可重新连接的 observable,但我没有得到的是如何更改 ReplaySubject。或者我可能需要创建一个 ReplayUntilCompletionSubject。
猜你喜欢
  • 2012-11-04
  • 2021-04-18
  • 1970-01-01
  • 2017-10-02
  • 1970-01-01
  • 1970-01-01
  • 2011-10-07
  • 1970-01-01
  • 2016-02-17
相关资源
最近更新 更多