【发布时间】:2018-08-22 16:11:43
【问题描述】:
我有以下 rxJava 链:
override fun combineLocationToPlace(req: Flowable<Place>): Flowable<Place> {
var combinedFlowable = Flowable
.combineLatest(
req,
getLastLocation().lastOrError().toFlowable(),
BiFunction<Place, Location, Place> { t1, location ->
Timber.w("FIRSTINIT - Retrieved location $location")
var placeLocation = Location(t1.placeName)
placeLocation.latitude = t1.latitude
placeLocation.longitude = t1.longitude
t1.distance = location.distanceTo(placeLocation)
t1
})
return combinedFlowable
.onErrorResumeNext { t: Throwable ->
Timber.w(t, "FIRSTINIT - Could not retrieve location for place (${t.message}) returning original request")
req
}
.doOnError {
Timber.w("FIRSTINIT - did detect the error here...")
}
return combinedFlowable
}
简而言之,我正在从本地数据库(一个地方)检索一些数据,我想将其与 GPS 的最新位置相结合:
override fun getLastLocation(requestIfEmpty: Boolean): Observable<Location> {
var lastLocation = locationProvider.lastKnownLocation
.doOnNext {
Timber.w("Got location $it from last one")
}
.doOnComplete {
Timber.w("did i get a location?")
}
if (requestIfEmpty) {
Timber.w("Switching to request of location")
lastLocation = lastLocation.switchIfEmpty(requestLocation())
}
return lastLocation.doOnNext {
Timber.w("Got something!")
location = it
}
}
但我想考虑用户没有最后位置的场景,因此该行:
return combinedFlowable
.onErrorResumeNext { t: Throwable ->
Timber.w(t, "FIRSTINIT - Could not retrieve location for place (${t.message}) returning original request")
req
}
.doOnError {
Timber.w("FIRSTINIT - did detect the error here...")
}
它试图捕获任何错误并仅使用原始请求重试而不将其与任何内容组合。我这样称呼这段代码:
fun getPlace(placeId: String) {
locationManager.combineLocationToPlace(placesRepository.getPlace(placeId))
.onErrorResumeNext { t: Throwable ->
Timber.e(t, "Error resuming next! ")
placesRepository.getPlace(placeId)
}.subscribeOn(schedulerProvider.io()).observeOn(schedulerProvider.ui())
.subscribeBy(
onNext = {
place.value = Result.success(it)
},
onError = {
Timber.e("ERROR! $it")
place.value = Result.failure(it)
}
)
.addTo(disposables)
}
但是,当没有位置抛出 NoSuchElementException 时,我的 flowable 切换到原始请求,然后在执行它时我得到一个 NetworkOnMainThread 异常。这个请求不应该在我放在那里的scheduler.io()上执行吗(因为我之前放了代码)?
如果您想知道,schedulerProvider.io() 转换为:
Schedulers.io()
获取地点:
/**
* Retrieves a single place from database
*/
override fun getPlace(id: String): Flowable<Place> {
return Flowable.merge(placesDao.getPlace(id),
refreshPlace(id).toFlowable())
}
/**
* Triggers a refreshPlace update on the db, useful when changing stuff associated with the place
* itself indirectly (e.g. an experience)
*/
private fun refreshPlace(id: String): Single<Place> {
return from(placesApi.getPlace(id))
.doOnSuccess {
placesDao.savePlace(it)
}
}
【问题讨论】:
-
onErrorResumeNext订阅指示错误的线程的后备。您应该申请req.subscribeOn(Schedulers.io())以确保那里的副作用发生在正确的线程上。至于observeOn或subscribeOn能达到多远,取决于所涉及的运算符,并且要正确猜测是一项非常重要的任务。因此,当您有疑问时,请始终使用subscribeOn和/或observeOn。 -
但我已经这样做了……
-
你说的原始请求是哪个
req?我没有看到任何req.subscribeOn。另外,如果您有错误,为什么不发布可以阐明问题所在的堆栈跟踪? -
你是对的,将 subscribeOn(schedulerProvider.io()) 附加到原始请求中解决了这个问题。谢谢!