【问题标题】:How can an Observable be paused without losing the items emitted?如何在不丢失发出的项目的情况下暂停 Observable?
【发布时间】:2016-03-03 20:50:46
【问题描述】:

我有一个 Observable 每秒发出滴答声:

Observable.interval(0, 1, TimeUnit.SECONDS)
    .take(durationInSeconds + 1));

我想暂停这个 Observable 让它停止发射数字,然后按需恢复它。

有一些陷阱:

  • 根据Observable Javadoc,interval 运算符不支持背压
  • 关于 backpressure 的 RxJava wiki 有一节关于 Callstack 阻塞作为背压的流控制替代方案

另一种处理过度生产的 Observable 的方法是阻塞调用堆栈(停放控制过度生产的 Observable 的线程)。这有违背“反应性”和非阻塞模型的缺点处方药。但是,如果有问题的 Observable 位于可以安全阻塞的线程上,这可能是一个可行的选择。 目前 RxJava 没有公开任何操作符来促进这一点。

有没有办法暂停interval Observable?或者我应该通过一些背压支持来实现我自己的“滴答”Observable?

【问题讨论】:

标签: rx-java reactivex


【解决方案1】:

有很多方法可以做到这一点。例如,您仍然可以使用interval() 并保持另外两个状态:比如布尔标志“暂停”和一个计数器。

public static final Observable<Long> pausableInterval(
  final AtomicBoolean paused, long initial, long interval, TimeUnit unit, Scheduler scheduler) {

  final AtomicLong counter = new AtomicLong();
  return Observable.interval(initial, interval, unit, scheduler)
      .filter(tick -> !paused.get())
      .map(tick -> counter.getAndIncrement()); 
}

然后你只需在某处调用 paused.set(true/false) 即可暂停/恢复

编辑 2016-06-04

上面的解决方案有点问题。 如果我们多次重用 observable 实例,它将从最后一次取消订阅时的值开始。例如:

Observable<Long> o = pausableInterval(...)
List<Long> list1 = o.take(5).toList().toBlocking().single();
List<Long> list2 = o.take(5).toList().toBlocking().single();

虽然 list1 应该是 [0,1,2,3,4],但 list2 实际上是 [5,6,7,8,9]。 如果不希望出现上述行为,则必须将 observable 设为无状态。这可以通过 scan() 运算符来实现。 修改后的版本可能是这样的:

  public static final Observable<Long> pausableInterval(final AtomicBoolean pause, final long initialDelay, 
      final long period, TimeUnit unit, Scheduler scheduler) {

    return Observable.interval(initialDelay, period, unit, scheduler)
        .filter(tick->!pause.get())
        .scan((acc,tick)->acc + 1);
  }

或者,如果您不希望依赖于 Java 8 和 lambda,您可以使用 Java 6+ 兼容代码执行以下操作:

https://github.com/ybayk/rxjava-recipes/blob/v0.0.2/src/main/java/yurgis/rxjava/recipes/RxRecipes.java#L343-L361

【讨论】:

  • 为什么使用toBlocking操作符?没有toBlocking不起作用?或者如何在没有 toBlocking 操作符的情况下订阅这个事件?
  • Toblocking 仅用于演示或单元测试。它只是为了确保您的单元测试在可观察对象完成之前不会退出。您可以像任何其他 observable 一样异步运行它。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-01-03
  • 2021-10-18
  • 2021-12-21
  • 2016-04-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多