【发布时间】:2018-10-24 15:49:22
【问题描述】:
我正在尝试桥接一些现有的 rx 代码和使用期货的 api。当我 手动处理我希望 onDispose() 始终被调用的可观察对象。它通常会,但是当我指定自定义调度程序时,它有时不会被调用。我的例子:
class Work {
private val disposables = CompositeDisposable()
fun getFuture(): ListenableFuture<String> {
val future = ResolvableFuture.create<String>()
disposables.add(
Observable.fromCallable {
try {
Thread.sleep(2000)
} catch (ex: InterruptedException) {
}
"1"
}.firstOrError()
.onErrorReturn { "2" }
.doOnDispose {
println("disposing 1 on ${Thread.currentThread().name}")
//sometimes this dispose does not get called
future.set("2")
}
.subscribeOn(Schedulers.io())
.doOnDispose {
println("disposing 2 on ${Thread.currentThread().name}")
//only this dispose gets called every time
//future.set("2")
}
.subscribe(Consumer {
future.set("2")
})
)
return future
}
fun stop() {
disposables.clear()
}
}
@Test
fun `doOnDispose does not get called`() {
println("------------")
for (i in 1..100) {
val work = Work()
val future = work.getFuture()
println("Cancelling")
work.stop()
println("Getting ${Thread.currentThread().name}")
val result = future.get(2, TimeUnit.SECONDS)
assertEquals("2", result)
println("------------")
}
}
每次只调用第二个 onDispose 。 .subscribeOn() 之前的那个有时根本不会被调用。
【问题讨论】:
标签: rx-java rx-java2 rx-kotlin