【发布时间】:2015-12-24 04:44:45
【问题描述】:
我还有一个问题。这次我在执行这段代码的过程中遇到了这个错误Caused by: rx.exceptions.MissingBackpressureException:
class UpdateHelper {
val numberOfFileToUpdate: PublishSubject<Int>
init {
numberOfFileToUpdate = PublishSubject.create()
}
public fun startUpdate(): Observable<Int>{
return getProducts().flatMap { products: ArrayList<Product> ->
numberOfFileToUpdate.onNext(products.size)
return@flatMap saveRows(products)
}
}
private fun getProducts(): Observable<ArrayList<Product>> {
return Observable.create {
var products: ArrayList<Product> = ArrayList()
var i = 0
while (i++ < 100) {
products.add(Product())
}
it.onNext(products)
it.onCompleted()
}
}
private fun saveRows(products: ArrayList<Product>): Observable<Int> {
return Observable.create<Int> {
var totalNumberOfRow = products.size
while (totalNumberOfRow-- > 0){
it.onNext(products.size - totalNumberOfRow)
Thread.sleep(100)
}
it.onCompleted()
}
}
}
代码只是两个进程的测试代码。第一个过程从网络获取Product 的列表,然后将这些产品持久化到应用程序内的本地数据库中。这是主要思想。
getProducts 方法负责获取数据,在本例中,我只创建了一个包含 100 个产品的 ArrayList。 saveRows 做持久化工作。
saveRows 方法发出一个代表已保存行的 Int。我这样做是因为在 UI 中有一个进度条报告进度。
从应用程序的另一点,我调用方法startUpdate,在发出一些项目后,我得到了描述异常
at com.techbyflorin.rockapan.helpers.UpdateHelper$saveRows$1.call(UpdateHelper.kt:46)
at com.techbyflorin.rockapan.helpers.UpdateHelper$saveRows$1.call(UpdateHelper.kt:40)
我理解为什么会发生此异常https://github.com/ReactiveX/RxJava/wiki/Backpressure,但我不知道我做错了什么或如何解决它。
谁能给我建议。
【问题讨论】: