【问题标题】:RxJava - How to stop (and resume) a Hot Observable (interval)?RxJava - 如何停止(和恢复)Hot Observable(间隔)?
【发布时间】:2017-04-25 12:34:27
【问题描述】:

我有以下 Hot Observable:

hotObservable = Observable.interval(0L, 1L, TimeUnit.SECONDS)
                          .map((t) -> getCurrentTimeInMillis()))

但是,我找不到阻止它的好方法。我能够使用takeWhileboolean 标志(runTimer)部分解决这个问题:

Observable.interval(0L, 1L, TimeUnit.SECONDS)
          .takeWhile((t) -> runTimer)
          .map((t) -> getCurrentTimeInMillis()))

不过,我不喜欢这种方法有两点:

  1. 我必须保留标志 runTimer,这是我不想要的。
  2. 一旦runTimer 变成false,Observable 就完成了,这意味着如果我想再次发射,我需要创建一个新的Observable。我不想要那个。我只想让 Observable 停止发射项目,直到我告诉它重新开始。

我希望是这样的:

hotObservable.stop();
hotObservable.resume();

这样我就不需要保留任何标志,并且 observable 始终处于活动状态(尽管它可能不会发出事件)。

我怎样才能做到这一点?

【问题讨论】:

  • 应观察到的停止生产项目,还是继续运行但停止发射它们?
  • @npace 停止生产将是理想的,但我也很高兴只停止发射。两者都适用于我的情况。
  • Kiskae 的回答是完全有效的,但仅供参考。如果您将先前解决方案中的 takeWhile 运算符替换为 filter 运算符,您将摆脱第二个缺点......跨度>

标签: android rx-java reactive-programming stoppropagation rx-java2


【解决方案1】:

一种可能的方法是使用 BehaviorSubject 和 switchMap:

BehaviorSubject<Boolean> subject = BehaviorSubject.create(true);
hotObservable = subject.distinctUntilChanged().switchMap((on) -> {
    if (on) {
        return Observable.interval(0L, 1L, TimeUnit.SECONDS);
    } else {
        return Observable.never();
    }
}).map((t) -> getCurrentTimeInMillis());

通过向主题发送布尔值,可以控制 observable 的输出。 subject.onNext(true) 将导致使用该主题创建的任何可观察对象开始发射值。 subject.onNext(false) 禁用该流程。

switchMap 在关闭时负责处理底层的 observables。它还使用distinctUntilChanged 来确保它不会进行不必要的切换。

【讨论】:

    【解决方案2】:

    你可以用这个

    Observable.interval(3, TimeUnit.SECONDS)
            .takeUntil(stop)
            .subscribe(new MyObserver());
    
    Thread.sleep(10000);
    stop.onNext(-1);
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-04-08
      • 1970-01-01
      • 2019-09-08
      • 1970-01-01
      • 2013-05-29
      • 1970-01-01
      • 1970-01-01
      • 2017-04-11
      相关资源
      最近更新 更多