【问题标题】:Caused by: rx.exceptions.MissingBackpressureException原因:rx.exceptions.MissingBackpressureException
【发布时间】:2015-12-24 04:44:45
【问题描述】:

我还有一个问题。这次我在执行这段代码的过程中遇到了这个错误Caused by: rx.exceptions.MissingBackpressureException

class UpdateHelper {
val numberOfFileToUpdate: PublishSubject<Int>

init {
    numberOfFileToUpdate = PublishSubject.create()
}

public fun startUpdate(): Observable<Int>{
    return getProducts().flatMap { products: ArrayList<Product> ->
            numberOfFileToUpdate.onNext(products.size)
            return@flatMap saveRows(products)
        }
}

private fun getProducts(): Observable<ArrayList<Product>> {
    return Observable.create {
        var products: ArrayList<Product> = ArrayList()
        var i = 0
        while (i++ < 100) {
            products.add(Product())
        }

        it.onNext(products)
        it.onCompleted()
    }
}


private fun saveRows(products: ArrayList<Product>): Observable<Int> {
    return Observable.create<Int> {
        var totalNumberOfRow = products.size

        while (totalNumberOfRow-- > 0){
            it.onNext(products.size - totalNumberOfRow)
            Thread.sleep(100)
        }
        it.onCompleted()
    }
}

}

代码只是两个进程的测试代码。第一个过程从网络获取Product 的列表,然后将这些产品持久化到应用程序内的本地数据库中。这是主要思想。

getProducts 方法负责获取数据,在本例中,我只创建了一个包含 100 个产品的 ArrayList。 saveRows 做持久化工作。

saveRows 方法发出一个代表已保存行的 Int。我这样做是因为在 UI 中有一个进度条报告进度。

从应用程序的另一点,我调用方法startUpdate,在发出一些项目后,我得到了描述异常

at com.techbyflorin.rockapan.helpers.UpdateHelper$saveRows$1.call(UpdateHelper.kt:46)

at com.techbyflorin.rockapan.helpers.UpdateHelper$saveRows$1.call(UpdateHelper.kt:40)

我理解为什么会发生此异常https://github.com/ReactiveX/RxJava/wiki/Backpressure,但我不知道我做错了什么或如何解决它。

谁能给我建议。

【问题讨论】:

    标签: android rx-java kotlin


    【解决方案1】:

    问题是您的 Observable 源发出的速度比消费者消耗的快。保存每个产品需要 100 毫秒。您可以添加 onBackpressureBuffer()。

    UpdateHelper().startUpdate()
        .onBackpressureBuffer() // Add this
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe({
          Log.d(TAG, "next $it")
        }, {
          Log.d(TAG, it.message)
        }, {
        })
    

    另外,您可以尝试删除Thread.sleep(100)

    平面图使用OperatorMerge (merge(map(func))):您可以看到,在您的情况下,map 的 onNexts 发送速度比请求的速度快。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2014-11-13
      • 2017-04-10
      • 1970-01-01
      • 1970-01-01
      • 2020-11-11
      • 2015-11-16
      • 2014-05-31
      相关资源
      最近更新 更多