【发布时间】:2018-04-13 02:37:32
【问题描述】:
我正在尝试查询一个 API,该 API 为我提供了要下载的文件列表(如下所示)。然后我继续下载这些文件,同时重新查询 API 以查找初始调用中可能遗漏的任何内容。
Completable#mergeDelayError(Iterable<? extends CompletableSource> sources) 用于确保我可以并行执行多个任务并在一切完成时收到通知。
fun fetchAndDownload(details: List<String>): Completable =
exampleApi.fetchPackages(details) // This is a Single
.flatMapCompletable { (results, retry) ->
val completables = mutableListOf<Completable>()
results.mapTo(completables) { value ->
exampleApi.download(value).subscribeOn(Schedulers.io())
}
if (retry.isNotEmpty()) {
completables += fetchAndDownload(retry)
.delay(3L, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
}
Completable.mergeDelayError(completables)
}
但是,这种实现可能会因为一次执行太多事情而使网络和/或线程数不堪重负。因此,我想知道最好的方法是限制一次运行的completables 的数量。
我知道Completable#mergeDelayError(Publisher<? extends CompletableSource> sources, int maxConcurrency),但不确定如何将我的List<Completable> 转换为所需的Publisher。另一种解决方案是提供具有最大线程数的自定义Scheduler,但我也不确定如何提供这样的Schduler(我可以在不再需要时清理并丢弃)。
【问题讨论】:
-
使用 Flowable.fromIterable 进行转换。
标签: java kotlin rx-java reactive-programming rx-java2