【问题标题】:doOnDispose does not get called when subscribing on a background scheduler在后台调度程序上订阅时不会调用 doOnDispose
【发布时间】:2018-10-24 15:49:22
【问题描述】:

我正在尝试桥接一些现有的 rx 代码和使用期货的 api。当我 手动处理我希望 onDispose() 始终被调用的可观察对象。它通常会,但是当我指定自定义调度程序时,它有时不会被调用。我的例子:

class Work {

private val disposables = CompositeDisposable()

fun getFuture(): ListenableFuture<String> {

    val future = ResolvableFuture.create<String>()

    disposables.add(

            Observable.fromCallable {
                try {
                    Thread.sleep(2000)
                } catch (ex: InterruptedException) {

                }
                "1"
            }.firstOrError()
                .onErrorReturn { "2" }
                .doOnDispose {
                    println("disposing 1 on ${Thread.currentThread().name}")
                    //sometimes this dispose does not get called
                    future.set("2")
                }
                .subscribeOn(Schedulers.io())
                .doOnDispose {
                    println("disposing 2 on ${Thread.currentThread().name}")
                    //only this dispose gets called every time
                    //future.set("2")
                }
                .subscribe(Consumer {
                    future.set("2")
                })
    )

    return future
}

fun stop() {
    disposables.clear()
}

}

@Test
fun `doOnDispose does not get called`() {
    println("------------")

    for (i in 1..100) {

        val work = Work()

        val future = work.getFuture()

        println("Cancelling")

        work.stop()

        println("Getting ${Thread.currentThread().name}")
        val result = future.get(2, TimeUnit.SECONDS)

        assertEquals("2", result)

        println("------------")
    }
}

每次只调用第二个 onDispose 。 .subscribeOn() 之前的那个有时根本不会被调用。

【问题讨论】:

    标签: rx-java rx-java2 rx-kotlin


    【解决方案1】:

    你在这里混合模式。不要那样做。使用 RxJava 或使用 Futures。您已经开始了维护噩梦。

    io() 调度程序上创建了一个线程,该线程立即休眠 2 秒。观察者链被释放(“disposing 2”),释放资源,dispose() 在链上继续工作。但是,dispose() 不会在 io() 线程上执行任何操作,因为它被 sleep() 阻止。现在,无论是线程完成还是 dispose 操作先进行,都存在竞争条件。

    我无法建议您如何解决该问题,因为我不知道您要做什么。我只知道你所拥有的是不可靠的。

    【讨论】:

    • 是的,我绝对不会使用期货,但是您需要以某种方式将 android workmanger 与 Rxjava 粘合,否则您必须使用 toBlocking() 这很糟糕developer.android.com/jetpack/docs/release-notes
    • 好吧,get() 上的 Future 也被阻塞了。我怀疑您可能不清楚观察者链的生命周期:如果您需要在操作完成或不再需要时释放资源,那么创建一个管理所有这些步骤的可观察对象可能会更好。跨度>
    猜你喜欢
    • 2012-01-14
    • 2018-02-19
    • 2020-04-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多