【问题标题】:RxJava delayed observables fire with unexpected delaysRxJava 延迟 observables 触发意外延迟
【发布时间】:2017-03-13 18:10:00
【问题描述】:

我有一个使用 RxJava observables 设置的事件序列。基本上,我将使用Observable.just(Events.*) 创建的不同事件与observable.delay(time, timeUnit, scheduler) 函数设置的不同延迟合并在一起。然后我将它们发布到PublishSubject(下面代码中的events)并订阅PublishSubject 以观察序列(下面代码中的observeEvents() 函数)。它曾经工作正常,但最近我在我的设备上看到了一个非常奇怪的行为(带有 android 5.0.2 的 OnePlus One)(并且在模拟器上看不到它)。基本上事件会混淆,具有较高延迟的事件可能会出现在具有较小延迟的事件之前,具有较小延迟的事件可能会出现在队列的末尾,有时所有事件都可能按正确的顺序出现。前 3 个事件尤其经常混合。有时某些事件根本没有被观察到。这里会发生什么?

代码在 Kotlin 中:

var computationScheduler = Schedulers.computation()

private val events: PublishSubject<Events> = PublishSubject.create()
private val userActionSubject: PublishSubject<Events> = PublishSubject.create()

Observable.merge(
            event0(),
            event1(),
            event2(),
            userActionOrEvent3(),
            userActionOrEvent4())
            .subscribe({
                // Weird timings are observed here already
                events.onNext(it)
            }, { e ->
                events.onError(e)
            }))

private fun userActionOrEvent4(): Observable<Events> {
    return Observable.amb(Observable.just(Events.Event4)
            .delay(12800, TimeUnit.MILLISECONDS, computationScheduler), userActionSubject.asObservable().subscribeOn(computationScheduler))
            .take(1)
}

private fun userActionOrEvent3(): Observable<Events> {
    return Observable.amb(Observable.just(Events.Event3)
            .delay(2800, TimeUnit.MILLISECONDS, computationScheduler), userActionSubject.asObservable().subscribeOn(computationScheduler))
            .take(1)
}

private fun event2() = Observable.just(Events.Event2)
        .delay(1800, TimeUnit.MILLISECONDS, computationScheduler)

private fun event1() = Observable.just(Events.Event1)
        .delay(200, TimeUnit.MILLISECONDS, computationScheduler)

private fun event0() = Observable.just(Events.Event0)
        .subscribeOn(computationScheduler)

open fun observeEvents(): Observable<Events> = events.asObservable().observeOn(AndroidSchedulers.mainThread())

open fun onUserAction() {
    userActionSubject.onNext(Events.Action)
}

【问题讨论】:

    标签: android rx-java rx-android


    【解决方案1】:

    这是因为您使用的是merge() 而不是concat

    这篇中篇文章将为你解释https://medium.com/fueled-android/rxify-a-simple-spell-for-complex-rxjava-operators-part-2-b82b379f5c7f#.ekppat50o的区别

    【讨论】:

      【解决方案2】:

      原来问题是由computationScheduler 引起的,当我将Schedulers.computation() 更改为Schedulers.newThread() 时,事件开始在预期的时间触发。

      【讨论】:

        猜你喜欢
        • 2013-07-07
        • 2018-09-03
        • 2012-06-08
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多