【问题标题】:RxJava Filter on ErrorRxJava 错误过滤器
【发布时间】:2020-09-01 20:59:07
【问题描述】:

这个问题与this question 松散相关,但没有答案。 Bob Dalgleish 的回答很接近,但不支持来自 Single 的潜在错误(我认为 OP 实际上也想要)。

我基本上是在寻找一种“过滤错误”的方法——但当查找是基于 RX 时,我认为这不存在。我正在尝试获取值列表,通过查找运行它们,并跳过任何返回查找失败(可抛出)的结果。我无法弄清楚如何以被动方式完成此任务。

我尝试了各种形式的error handling operators 与映射相结合。过滤器仅适用于原始值 - 或者至少我不知道如何使用它来支持我想做的事情。

在我的用例中,我迭代了一个 ID 列表,从远程服务请求每个 ID 的数据。如果服务返回 404,则该项目不再存在。我应该从本地数据库中删除不存在的项目并继续处理 ID。流应返回查找值的列表。

这是一个松散的例子。如何编写 getStream() 以便 canFilterOnError 通过?

import io.reactivex.Single
import io.reactivex.schedulers.Schedulers
import org.junit.Test

class SkipExceptionTest {

    private val data: Map<Int, String> = mapOf(
            Pair(1, "one"),
            Pair(2, "two"),
            Pair(4, "four"),
            Pair(5, "five")
    )

    @Test
    fun canFilterOnError() {

        getStream(listOf(1, 2, 3, 4, 5))
                .subscribeOn(Schedulers.trampoline())
                .observeOn(Schedulers.trampoline())
                .test()
                .assertComplete()
                .assertNoErrors()
                .assertValueCount(1)
                .assertValue {
                    it == listOf(
                            "one", "two", "four", "five"
                    )
                }
    }

    fun getStream(list: List<Int>): Single<List<String>> {
        // for each item in the list
        // get it's value via getValue()
        // if a call to getValue() results in a NotFoundException, skip that value and continue
        // mutate the results using mutate()

        TODO("not implemented")
    }

    fun getValue(id: Int): Single<String> {
        return Single.fromCallable {
            val value: String? = data[id]
            if (value != null) {
                data[id]
            } else {
                throw NotFoundException("dat with id $id does not exist")
            }
        }
    }

    class NotFoundException(message: String) : Exception(message)
}

【问题讨论】:

    标签: kotlin rx-java


    【解决方案1】:

    首先是.materialize(),然后是.filter()(非错误事件),然后是.dematerialize()

    getStream(/* ... */)
      .materialize()
      .filter(notification -> { return !notification.isOnError(); })
      .dematerialize()
    

    【讨论】:

    • 这看起来确实像我正在寻找的东西。我正在为上面的示例编写此代码,如果我们可以将工作代码(您的或我的)纳入此答案,我将给予您信任。当错误出现时,我当前的尝试结束了流 - 所以我只得到错误之前的值。
    • 在 Java 中添加了一个 sn-p,尝试在您的代码中添加 .getStream() 之后的最后 3 行(但随后在 Kotlin 中)。
    • 我基本上是这样做的,并得到 ("one", "two") 作为输出,但未通过测试。这是我所拥有的要点。 gist.github.com/randallmitchell/…
    • 据我所知,materialize 让我在错误进入下游之前过滤掉它,但它并不能阻止错误停止上游事件......所以我没有得到任何额外的响应.
    • 我明白了,在这种情况下,将过滤应用于 .getValue() 方法而不是 .getStream()
    【解决方案2】:

    我最终将getValue() 映射到Optional&lt;String&gt;,然后调用onErrorResumeNext() 并返回Single.error()Single.just(Optional.empty())。从那里,主流可以过滤掉空的 Optional。

    private fun getStream(list: List<Int>): Single<List<String>> {
        return Observable.fromIterable(list)
                .flatMapSingle {
                    getValue(it)
                            .map {
                                Optional.of(it)
                            }
                            .onErrorResumeNext {
                                when (it) {
                                    is NotFoundException -> Single.just(Optional.empty())
                                    else -> Single.error(it)
                                }
                            }
                }
                .filter { it.isPresent }
                .map { it.get() }
                .toList()
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-05-18
      • 2018-10-07
      相关资源
      最近更新 更多