【问题标题】:How to stop and resume Observable.interval emiting ticks如何停止和恢复 Observable.interval 发射滴答声
【发布时间】:2016-05-26 22:27:04
【问题描述】:

这将每 5 秒发出一次滴答声。

Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
            .subscribe(tick -> Log.d(TAG, "tick = "+tick));

要阻止它,您可以使用

Schedulers.shutdown();

但是随后所有的调度程序都停止了,以后不可能恢复滴答作响。如何“优雅地”停止和恢复发射?

【问题讨论】:

标签: java rx-java rx-android


【解决方案1】:

这是一种可能的解决方案:

class TickHandler {

    private AtomicLong lastTick = new AtomicLong(0L);
    private Subscription subscription;

    void resume() {
        System.out.println("resumed");
        subscription = Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
                                 .map(tick -> lastTick.getAndIncrement())
                                 .subscribe(tick -> System.out.println("tick = " + tick));
    }

    void stop() {
        if (subscription != null && !subscription.isUnsubscribed()) {
            System.out.println("stopped");
            subscription.unsubscribe();
        }
    }
}

【讨论】:

  • @MathijsSegers 如果我正确理解了你的问题,Observable.interval 是一个冷观测值,所以取消订阅它会停止它的排放。同时,您可能还可以依赖Subscription(或RxJava 2 中的Disposable)的实现来丢弃对unsubscribe()dispose())上可观察对象的引用。或者您可以取消 subscription 引用并确保它符合 GC 条件,以及存储在其中的 observable。
  • 这里的好奇心问题。将 lastTick 声明为 Atomic 重要吗?我以为所有volatile 和 Atomic 的事情都是由 Rx 直接处理的。
  • @DanChaltiel 一般来说,当多个线程写入一个依赖于该字段前一个值的字段值时,volatile提供的保证不够强,需要访问通过可用的 Java 并发 API 同步或安排。更多关于volatiletutorials.jenkov.com/java-concurrency/…
  • 所以实际上 RxJava 没有暂停,只需取消订阅然后重新订阅。
【解决方案2】:
val switch = new java.util.concurrent.atomic.AtomicBoolean(true)
val tick = new java.util.concurrent.atomic.AtomicLong(0L)

val suspendableObservable = 
  Observable.
    interval(5 seconds).
    takeWhile(_ => switch.get()).
    repeat.
    map(_ => tick.incrementAndGet())

您可以将switch 设置为false 以暂停滴答声并设置true 以恢复它。

【讨论】:

  • 这个重复有关键。谢谢
【解决方案3】:

我认为这是另一种方法。
当您查看源代码时,您会发现 interval() 使用类 OnSubscribeTimerPeriodically。关键代码如下。

@Override
public void call(final Subscriber<? super Long> child) {
    final Worker worker = scheduler.createWorker();
    child.add(worker);
    worker.schedulePeriodically(new Action0() {
        long counter;
        @Override
        public void call() {
            try {
                child.onNext(counter++);
            } catch (Throwable e) {
                try {
                    worker.unsubscribe();
                } finally {
                    Exceptions.throwOrReport(e, child);
                }
            }
        }

    }, initialDelay, period, unit);
}

所以,您会看到,如果您想取消循环,那么在 onNext() 中抛出一个新异常会怎样。示例代码如下。

Observable.interval(1000, TimeUnit.MILLISECONDS)
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    Log.i("abc", "onNext");
                    if (aLong == 5) throw new NullPointerException();
                }
            }, new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    Log.i("abc", "onError");
                }
            }, new Action0() {
                @Override
                public void call() {
                    Log.i("abc", "onCompleted");
                }
            });

然后你会看到这个:

08-08 11:10:46.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:47.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:48.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:49.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:50.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:51.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:51.018 28146-28181/net.bingyan.test I/abc: onError             

【讨论】:

  • 异常应该只用于处理异常事件而不是实现逻辑!它创建了不必要的对象,计算量大且语法混乱
【解决方案4】:

对不起,这是在 RxJS 而不是 RxJava,但概念是一样的。我从learn-rxjs.io 改编了这个,这里是codepen

这个想法是您从两个点击事件流开始,startClick$stopClick$。在stopClick$ 流上发生的每次点击都会映射到一个空的可观察对象,而在startClick$ 上的每次点击都会映射到interval$ 流。两个生成的流将merge-d 一起变成一个可观察的可观察对象。换句话说,每次点击都会从merge 发出这两种类型之一的新可观察对象。生成的 observable 将通过switchMap,它开始收听这个新的 observable 并停止收听之前正在收听的任何内容。 Switchmap 也会开始将这个新的 observable 中的值合并到它现有的流中。

切换后,scan 只能看到interval$ 发出的“增量”值,而单击“停止”时看不到任何值。

在第一次点击发生之前,startWith 将开始从$interval 发出值,只是为了让事情顺利进行:

const start = 0;
const increment = 1;
const delay = 1000;
const stopButton = document.getElementById('stop');
const startButton = document.getElementById('start');
const startClick$ = Rx.Observable.fromEvent(startButton, 'click');
const stopClick$ = Rx.Observable.fromEvent(stopButton, 'click');
const interval$ = Rx.Observable.interval(delay).mapTo(increment);
const setCounter = newValue => document.getElementById("counter").innerHTML = newValue;
setCounter(start);

const timer$ = Rx.Observable

    // a "stop" click will emit an empty observable,
    // and a "start" click will emit the interval$ observable.  
    // These two streams are merged into one observable.
    .merge(stopClick$.mapTo(Rx.Observable.empty()), 
           startClick$.mapTo(interval$))

    // until the first click occurs, merge will emit nothing, so 
    // use the interval$ to start the counter in the meantime
    .startWith(interval$)

    // whenever a new observable starts, stop listening to the previous
    // one and start emitting values from the new one
    .switchMap(val => val)

    // add the increment emitted by the interval$ stream to the accumulator
    .scan((acc, curr) => curr + acc, start)

    // start the observable and send results to the DIV
    .subscribe((x) => setCounter(x));

这是 HTML

<html>
<body>
  <div id="counter"></div>
  <button id="start">
    Start
  </button>
  <button id="stop">
    Stop
  </button>
</body>
</html>

【讨论】:

    【解决方案5】:

    前段时间,我也在寻找一种 RX“定时器”解决方案,但没有一个符合我的期望。所以你可以在那里找到我自己的解决方案:

    AtomicLong elapsedTime = new AtomicLong();
    AtomicBoolean resumed = new AtomicBoolean();
    AtomicBoolean stopped = new AtomicBoolean();
    
    public Flowable<Long> startTimer() { //Create and starts timper
        resumed.set(true);
        stopped.set(false);
        return Flowable.interval(1, TimeUnit.SECONDS)
                .takeWhile(tick -> !stopped.get())
                .filter(tick -> resumed.get())
                .map(tick -> elapsedTime.addAndGet(1000));
    }
    
    public void pauseTimer() {
        resumed.set(false);
    }
    
    public void resumeTimer() {
        resumed.set(true);
    }
    
    public void stopTimer() {
        stopped.set(true);
    }
    
    public void addToTimer(int seconds) {
        elapsedTime.addAndGet(seconds * 1000);
    }
    

    【讨论】:

    • 我不敢相信没有人赞同这个答案。接受的答案会在每次暂停后创建一个新的 Observable,并且不适应 lambda 中的最终字段(您必须使用类字段)。这是一个更好的解决方案。
    • 这是唯一可以在生产代码中使用的答案。
    • 我很好奇为什么选择了 Flowable 而不是 Observable。这需要订阅用户明确请求值,对吧?
    • @RobertLewis 否您不必请求值。 Flowable 默认支持背压,这是有原因的。
    • 我很困惑。我的理解是 Flowable 在调用 request(n) 之前不会发出任何东西。为什么不使用Observable?请解释一下。
    【解决方案6】:

    您可以使用 takeWhile 并循环直到条件为真

    Observable.interval(1, TimeUnit.SECONDS)
            .takeWhile {
                Log.i(TAG, " time " + it)
                it != 30L
            }
            .subscribe(object : Observer<Long> {
                override fun onComplete() {
                    Log.i(TAG, "onComplete " + format.format(System.currentTimeMillis()))
                }
    
                override fun onSubscribe(d: Disposable) {
                    Log.i(TAG, "onSubscribe " + format.format(System.currentTimeMillis()))
                }
    
                override fun onNext(t: Long) {
                    Log.i(TAG, "onNext " + format.format(System.currentTimeMillis()))
                }
    
                override fun onError(e: Throwable) {
                    Log.i(TAG, "onError")
                    e.printStackTrace()
                }
    
            });
    

    【讨论】:

      【解决方案7】:

      @AndroidEx ,这是一个很棒的答案。我做的有点不同:

      private fun disposeTask() {
          if (disposeable != null && !disposeable.isDisposed)
            disposeable.dispose()
        }
      
       private fun runTask() {
          disposeable = Observable.interval(0, 30, TimeUnit.SECONDS)
      .flatMap {
              apiCall.runTaskFromServer()
      .map{
      
      when(it){
      is ResponseClass.Success ->{
      keepRunningsaidTasks()
      }
      is ResponseClass.Failure ->{
      disposeTask() //this will stop the task in instance of a network failure.
      }
      }
      
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2014-05-24
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-07-25
        相关资源
        最近更新 更多