【问题标题】:Execute logic when RxJS 5 refCount() connects to or unsubscribes from sourceRxJS 5 refCount() 连接或取消订阅源时执行逻辑
【发布时间】:2017-07-18 18:19:20
【问题描述】:

根据Multicasting 上的 RxJS 5 手册部分

...我们可以使用 ConnectableObservable 的 refCount() 方法(引用计数),它返回一个跟踪它有多少订阅者的 Observable。当订阅者数量从 0 增加到 1 时,它会为我们调用 connect(),从而开始共享执行。只有当订阅者数量从 1 减少到 0 时才会完全取消订阅,停止进一步执行。

我想了解是否可以挂钩到这些事件中的每一个并执行一些逻辑,最好是在源 observable 的 connect()unsubscribe() 发生之前,但即使在事实发生之后也是可以接受的。

如果在使用 refCount() 运算符时无法做到这一点,如果您能提供一个如何使用自定义运算符实现此目的的示例,我们将不胜感激。

我想也许我可以以某种方式使用来自 do(nextFn,errFn,completeFn) 的 completeFn 来挂钩,但似乎没有像下面的 sn-p 所示那样工作。

var source = Rx.Observable.interval(500)
  .do(
    (x) => console.log('SOURCE emitted ' + x),
    (err) => console.log('SOURCE erred ' + err),
    () => console.log('SOURCE completed ')
  );
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;

// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

setTimeout(() => {
  console.log('observerB subscribed');
  subscription2 = refCounted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  console.log('observerA unsubscribed');
  subscription1.unsubscribe();
}, 1200);

// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
  console.log('observerB unsubscribed');
  subscription2.unsubscribe();
}, 2000);
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>

【问题讨论】:

  • 您希望从您制作的示例中得到什么输出?我想这就是你要找的。​​span>

标签: javascript functional-programming rxjs observable rxjs5


【解决方案1】:

您可以在实际流之前使用.do(null,null, onComplete) 和在完成/取消订阅之后使用.finally() 的组合,以便在订阅之前和完成/取消订阅之后有事件:

const source = Rx.Observable.empty()
  .do(null,null, () => console.log('subscribed'))
  .concat(Rx.Observable.interval(500))
  .finally(() => console.log('unsubscribed'))
  .publish().refCount();

const sub1 = source
  .take(5)
   .subscribe(
     val => console.log('sub1 ' + val),
     null, 
     () => console.log('sub1 completed')
   );
const sub2 = source
  .take(3)
  .subscribe(
    val => console.log('sub2 ' + val), 
    null, 
    () => console.log('sub2 completed')
  );

// simulate late subscription setting refCount() from 0 to 1 again                      
setTimeout(() => {
  source
    .take(1)
    .subscribe(
      val => console.log('late sub3 val: ' + val),
      null, 
      () => console.log('sub3 completed')
    );
 
}, 4000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.js"></script>

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-12-20
    • 1970-01-01
    • 2017-08-24
    • 2017-03-26
    • 2020-10-28
    • 2020-08-11
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多