【问题标题】:On demand execution of hot Observable按需执行热 Observable
【发布时间】:2015-12-25 12:15:01
【问题描述】:

举个冷酷的例子:

Observable<Integer> cold = Observable.create(subscriber -> {
  try {
    for (int i = 0; i <= 42; i++) {

      // avoid doing unnecessary work
      if (!subscriber.isUnsubscribed()) {
        break;
      }

      subscriber.onNext(i);
    }
    subscriber.onCompleted();
  } catch (Throwable cause) {
    subscriber.onError(cause);
  }
});

它开始为每个新订阅者从头开始执行:

// starts execution
cold.subscribe(...)

如果订阅者提前取消订阅,则可以停止执行:

// stops execution
subscription.unsubscribe();

现在,如果我们有一些实际的业务逻辑而不是示例 for 循环(不需要为每个订阅者重播,而是实时),那么我们正在处理 hot observable...

PublishSubject<Integer> hot = PublishSubject.create();

Thread thread = new Thread(() -> {
  try {
    for (int i = 0; i < 42; i++) {
      // how to avoid unnecessary work when no one is subscribed?
      hot.onNext(i);
    }
    hot.onCompleted();
  } catch (Throwable cause) {
    hot.onError(cause);
  }
});

当我们希望它启动时,我们可能会这样做

// stats work (although no one is subscribed) 
thread.start();

因此第一个问题:只有在第一个观察者订阅时才开始工作?(可能是可连接的观察者?)

还有一个重要的问题:当最后一个订阅者退订时如何停止工作?(我不知道如何访问该主题的当前订阅,并且希望找到没有共享全局状态的干净解决方案,如果存在这样的解决方案)

我能想到的一个解决方案是使用自定义运营商来提升主题,该运营商将管理订阅者...

【问题讨论】:

    标签: java reactive-programming rx-java reactive-streams


    【解决方案1】:

    参见运算符 refCount - http://reactivex.io/documentation/operators/refcount.html。这个 Operator 把你的 Observable 变成 ConnectableObservable,当第一个订阅者订阅时连接它,当没有更多订阅时断开连接

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-11-13
      • 1970-01-01
      • 1970-01-01
      • 2023-03-05
      • 1970-01-01
      • 2018-04-08
      相关资源
      最近更新 更多