【发布时间】:2016-04-20 19:29:27
【问题描述】:
在下面的代码中,我创建了一个简单的 observable,它产生一个值然后完成。然后我分享那个可观察的重播最后一个项目并订阅 3 次。第一次紧随其后,第二次在产生值之前,第三次在产生值并且 observable 完成之后。
let i = 0;
let obs$ = Rx.Observable.create(obs => {
console.log('Creating observable');
i++;
setTimeout(() => {
obs.onNext(i);
obs.onCompleted();
}, 2000);
}).shareReplay(1);
obs$.subscribe(
data => console.log(`s1: data = ${data}`),
() => {},
() => console.log('finish s1')
);
setTimeout( () => {
obs$.subscribe(
data => console.log(`s2: data = ${data}`),
() => {},
() => console.log('finish s2')
);
}, 1000);
setTimeout( () => {
obs$.subscribe(
data => console.log(`s3: data = ${data}`),
() => {},
() => console.log('finish s3')
);
}, 6000);
这会产生以下大理石图
Actual
s1: -----1$
s2: \--1$
s3: \1$
但我会期待
Expected
s1: -----1$
s2: \--1$
s3: \----2$
我可以理解为什么有人想要第一个行为,但我的理由是,与此示例不同的是,我返回一个数字,我可能返回一个易受取消订阅行为影响的对象,例如数据库连接.如果上面的弹珠图表示一个数据库连接,在 dispose 方法中我调用db.close(),在第三个订阅上我会遇到一个异常,因为我收到了一个已发布的数据库处理程序作为值。 (因为当第二次订阅完成时 refCount = 0 并且源被释放)。
这个例子还有一个奇怪的地方是,即使它正在解决 第一个值并在之后完成,它订阅源两次(正如您可以通过重复的“创建可观察”看到的那样)
我知道this github issue 谈论过这个,但我缺少的是:
如何实现(在 RxJs4 和 5 中)共享的 observable,如果源 observable 尚未完成,则可以重播最后一个项目,如果已完成 (refCount = 0),则重新创建 observable。
在 RxJs5 中,我认为 share 方法解决了我的问题的重新连接部分,但不是共享部分。
在 RxJs4 中我一无所知
如果可能的话,我想使用现有的运算符或主题来解决这个问题。我的直觉告诉我,我必须用这种逻辑创建一个不同的主题,但我还没有完全做到。
【问题讨论】:
-
当您说该对象容易受到退订行为影响时,我不清楚您的意思是什么,您是说您正在发送一个嵌套在另一个
Observable中的Observable? -
没有必要。我的意思是,如果我创建一个
Observable来连接到数据库,例如使用Observable.create,在返回函数中我会调用db.close(),意思是,当你完成了这个可观察的,关闭数据库连接。如果我共享这个 observable(为了避免每个查询有一个连接),当 refCount 变为 0 时,observable 将处理,调用db.close()并且下次有人订阅共享的 Observable 时,它将返回关闭的数据库处理程序重建一个。
标签: javascript reactive-programming rxjs rxjs5