【问题标题】:RxJava Kotlin combineLatest timeout before first elementRxJava Kotlin combineLatest 在第一个元素之前超时
【发布时间】:2019-09-07 11:50:51
【问题描述】:

我正在用 Kotlin 开发一个安卓应用。我有 3 个实时数据 observables。数据来自 Firestore。它们被包裹在 RxJava 的 Observable.combineLatest() 方法中。我想在第一次数据检索时设置超时。

我试图在每个 observables 上设置超时函数,但是在加载了所有 3 个 observables 的初始数据后它们会抛出 TimeoutException。

private fun retrieveAllData() {
Observable.combineLatest(
                retrieveDataObservable1().timeout(10, TimeUnit.SECONDS),
                retrieveDataObservable2().timeout(10, TimeUnit.SECONDS),
                retrieveDataObservable3().timeout(10, TimeUnit.SECONDS),
                Function3<String, String, Boolean, Triple<String, String, Boolean>>
                { firstResult, secondResult, ThirdResult ->
                    Triple(firstResult, secondResult, ThirdResult)
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(
                        { Log.d(TAG, "success") },
                        { throwable -> Log.d(TAG, "error", throwable) }
                )
}

private fun retrieveDataObservable1(): Observable<String> {
        return Observable.create<String> { emitter ->
            val listener = dataRef1.addSnapshotListener { snapshot, e ->
                if (e != null) {
                    emitter.onError(e)
                    return@addSnapshotListener
                }
                emitter.onNext("some value")
            }
         emitter.setCancellable { listener.remove() }
     }
}

如果至少有一个 Observable 在 10 秒内没有发出任何项目,我预计 Observable.combineLatest() 在初始检索时会出现 TimeoutExceiption。 如果所有 observables 至少成功发出一次数据,则不应出现 TimeoutException。

【问题讨论】:

  • 我想你会为你的问题找到一些可行的解决方案here
  • @adrianbukros 我看过这个,但遗憾的是无法在 Kotlin 中使用
  • 试图让你成为一个例子。可以查一下吗?

标签: android kotlin timeout rx-java2 combinelatest


【解决方案1】:

它认为这应该适合你:

import io.reactivex.Observable
import io.reactivex.functions.Function
import io.reactivex.functions.Function3
import java.util.concurrent.TimeUnit

Observable.combineLatest(
    firstObservable,
    secondObservable,
    thirdObservable,
    Function3<String, String, String, String> { first, second, third -> "$first$second$third" })
    .timeout<String, String>(
       Observable.empty<String>().delay(10, TimeUnit.SECONDS),
       Function { Observable.never<String>() }
    )
    .subscribe {
        println("$it")
    }

timeout() 运算符的第一个参数是Observable,用于第一项超时。在我们的例子中是 10 秒。对于即将到来的项目,它是一个为所有项目返回 Observable.never() 的函数,因此它永远不会触发超时。

【讨论】:

  • 是的。这正是我一直在寻找的。非常感谢!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多