【发布时间】:2017-07-18 18:19:20
【问题描述】:
根据Multicasting 上的 RxJS 5 手册部分
...我们可以使用 ConnectableObservable 的 refCount() 方法(引用计数),它返回一个跟踪它有多少订阅者的 Observable。当订阅者数量从 0 增加到 1 时,它会为我们调用 connect(),从而开始共享执行。只有当订阅者数量从 1 减少到 0 时才会完全取消订阅,停止进一步执行。
我想了解是否可以挂钩到这些事件中的每一个并执行一些逻辑,最好是在源 observable 的 connect() 或 unsubscribe() 发生之前,但即使在事实发生之后也是可以接受的。
如果在使用 refCount() 运算符时无法做到这一点,如果您能提供一个如何使用自定义运算符实现此目的的示例,我们将不胜感激。
我想也许我可以以某种方式使用来自 do(nextFn,errFn,completeFn) 的 completeFn 来挂钩,但似乎没有像下面的 sn-p 所示那样工作。
var source = Rx.Observable.interval(500)
.do(
(x) => console.log('SOURCE emitted ' + x),
(err) => console.log('SOURCE erred ' + err),
() => console.log('SOURCE completed ')
);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;
// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
setTimeout(() => {
console.log('observerB subscribed');
subscription2 = refCounted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 600);
setTimeout(() => {
console.log('observerA unsubscribed');
subscription1.unsubscribe();
}, 1200);
// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
console.log('observerB unsubscribed');
subscription2.unsubscribe();
}, 2000);
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
【问题讨论】:
-
您希望从您制作的示例中得到什么输出?我想这就是你要找的。span>
标签: javascript functional-programming rxjs observable rxjs5