有很多方法可以做到这一点。例如,您仍然可以使用interval() 并保持另外两个状态:比如布尔标志“暂停”和一个计数器。
public static final Observable<Long> pausableInterval(
final AtomicBoolean paused, long initial, long interval, TimeUnit unit, Scheduler scheduler) {
final AtomicLong counter = new AtomicLong();
return Observable.interval(initial, interval, unit, scheduler)
.filter(tick -> !paused.get())
.map(tick -> counter.getAndIncrement());
}
然后你只需在某处调用 paused.set(true/false) 即可暂停/恢复
编辑 2016-06-04
上面的解决方案有点问题。
如果我们多次重用 observable 实例,它将从最后一次取消订阅时的值开始。例如:
Observable<Long> o = pausableInterval(...)
List<Long> list1 = o.take(5).toList().toBlocking().single();
List<Long> list2 = o.take(5).toList().toBlocking().single();
虽然 list1 应该是 [0,1,2,3,4],但 list2 实际上是 [5,6,7,8,9]。
如果不希望出现上述行为,则必须将 observable 设为无状态。这可以通过 scan() 运算符来实现。
修改后的版本可能是这样的:
public static final Observable<Long> pausableInterval(final AtomicBoolean pause, final long initialDelay,
final long period, TimeUnit unit, Scheduler scheduler) {
return Observable.interval(initialDelay, period, unit, scheduler)
.filter(tick->!pause.get())
.scan((acc,tick)->acc + 1);
}
或者,如果您不希望依赖于 Java 8 和 lambda,您可以使用 Java 6+ 兼容代码执行以下操作:
https://github.com/ybayk/rxjava-recipes/blob/v0.0.2/src/main/java/yurgis/rxjava/recipes/RxRecipes.java#L343-L361