【问题标题】:RxJava Flowable.Interval backpressure when flatmap with singleRxJava Flowable.Interval 使用单一平面图时的背压
【发布时间】:2018-04-13 06:23:15
【问题描述】:

我遇到了需要定期调用 API 来检查结果的情况。我正在使用Flowable.interval 创建一个调用 API 的区间函数。

但是,我遇到了背压问题。在下面的示例中,间隔中的每个刻度都会创建一个新单曲。预期的效果是仅在调用尚未进行时才调用 API

Flowable.interval(1, 1, TimeUnit.SECONDS).flatMap {
        System.out.println("Delay $it")

        //simulates API call
        Single.just(1L).doAfterSuccess {
            System.out.println("NEW SINGLE!!!")
        }.delay(4, TimeUnit.SECONDS).doAfterSuccess {
            System.out.println("SINGLE SUCCESS!!!")
        }.toFlowable()
    }.subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).blockingFirst()

我可以像这样使用过滤器变量来解决这个问题:

var filter = true

Flowable.interval(1, 1, TimeUnit.SECONDS).filter {
    filter
}.flatMap {

    System.out.println("Delay $it")

    Single.just(1L).doOnSubscribe {
        filter = true
    }.doAfterSuccess {
        System.out.println("NEW SINGLE!!!")
    }.delay(4, TimeUnit.SECONDS).doAfterSuccess {
        System.out.println("SINGLE!!!")
        filter = true
    }.toFlowable()
}.subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).blockingFirst()

但这似乎是一个 hacky 解决方案。我已经厌倦了在interval 函数之后应用onBackPressureDrop,但它没有效果。

有什么建议吗?

【问题讨论】:

  • 嗯,我认为 onBackPressureXXX 应该处理这个问题,因为它也在文档中描述:The operator generates values based on time and ignores downstream backpressure which * may lead to {@code MissingBackpressureException} at some point in the chain. * Consumers should consider applying one of the {@code onBackpressureXXX} operators as well.

标签: kotlin rx-java2 rx-kotlin2


【解决方案1】:

你也必须约束flatMap

Flowable.interval(1, 1, TimeUnit.SECONDS)
.onBackpressureDrop()
.flatMapSingle({
    System.out.println("Delay $it")

    //simulates API call
    Single.just(1L).doAfterSuccess {
        System.out.println("NEW SINGLE!!!")
    }.delay(4, TimeUnit.SECONDS).doAfterSuccess {
        System.out.println("SINGLE SUCCESS!!!")
    }
}, false, 1)  // <----------------------------------------------------------
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe()

【讨论】:

    猜你喜欢
    • 2015-12-23
    • 1970-01-01
    • 1970-01-01
    • 2018-08-24
    • 2017-03-01
    • 2023-03-14
    • 1970-01-01
    • 2013-11-26
    • 1970-01-01
    相关资源
    最近更新 更多