【问题标题】:How to continue the stream with error in Rx?如何在 Rx 中继续出现错误的流?
【发布时间】:2018-11-23 02:30:39
【问题描述】:

我正在使用 Kotlin、RxJava、Retrofit 开发一个 Android 应用程序。 我想向服务器发送 Http 请求。

PUT - 作业的更新选项

POST - 运行作业

第一个请求成功后,我发送第二个请求。所以我使用了 concatMap。

val updateJob = restService.updateJob(token, job.id, options) // PUT
val runJob = restService.runJob(token, job.id) // POST

updateJob.concatMap { runJob }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({ job ->
        Log.d(TAG, "runJob - success: $job")
    }, {
        Log.e(TAG, "runJob - failed: ${it.message}")
        it.printStackTrace()
    })

但我不知道是否有像下面这样的多个工作。

我有一份工作清单。 如果一项作业的“更新”请求失败,则不应发送“运行”请求。 但下一份工作应该继续。 为此,我编写了如下代码。

    Observable.fromIterable(jobs.toList())
    .concatMap { job ->
        val updateJob = restService.updateJob(token, job.id, job)   // HTTP PUT Request
        val runJob = restService.runJob(token, job.id)  // HTTP POST Request

        updateJob.concatMap { runJob }
    }
    .window(1)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({
        it.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({
                Log.e(TAG, "run job - success")
            }, {
                Log.e(TAG, "run job - failed - 1: ${it.message}")
                it.printStackTrace()
            })
    }, {
        Log.e(TAG, "run job - failed - 2: ${it.message}")
        it.printStackTrace()
    })

我认为“窗口”运算符可能是解决方案。 但它不... 如果某些作业失败,则流将通过 onError() 结束。 我该如何解决这个问题?

【问题讨论】:

  • 使用concatMapDelayErroronErrorResumeNext怎么样?
  • 这种情况下,应该如何使用onErrorResumeNext?
  • @Malt,我使用“onErrorResumeNext”解决了这个问题。我的代码被更改为删除了“window()”。谢谢你:)

标签: android kotlin rx-java reactive-programming


【解决方案1】:

我使用以下代码解决了这个问题。

Observable.fromIterable(jobs.toList())
    .concatMap { job ->
        val updateJob = restService.updateJob(token, job.id, job)   // HTTP PUT Request
                .onErrorResumeNext(Observable.empty<Job>()) // Solution Point
        val runJob = restService.runJob(token, job.id)  // HTTP POST Request

        updateJob.concatMap { runJob }
    }
    // .window(2)    I removed this line.
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({
        Log.d(TAG, "run job - success")
    }, {
        Log.e(TAG, "run job - failed - 2: ${it.message}")
        it.printStackTrace()
    })

【讨论】:

    猜你喜欢
    • 2021-12-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-01-11
    • 1970-01-01
    • 2013-07-29
    • 1970-01-01
    • 2015-05-12
    相关资源
    最近更新 更多