【问题标题】:RxJava: Specifying maximum concurrency for a list of CompletablesRxJava:为 Completables 列表指定最大并发性
【发布时间】:2018-04-13 02:37:32
【问题描述】:

我正在尝试查询一个 API,该 API 为我提供了要下载的文件列表(如下所示)。然后我继续下载这些文件,同时重新查询 API 以查找初始调用中可能遗漏的任何内容。

Completable#mergeDelayError(Iterable<? extends CompletableSource> sources) 用于确保我可以并行执行多个任务并在一切完成时收到通知。

fun fetchAndDownload(details: List<String>): Completable = 
    exampleApi.fetchPackages(details) // This is a Single
        .flatMapCompletable { (results, retry) -> 
            val completables = mutableListOf<Completable>()
            results.mapTo(completables) { value ->
                exampleApi.download(value).subscribeOn(Schedulers.io())
            }

            if (retry.isNotEmpty()) { 
                completables += fetchAndDownload(retry)
                    .delay(3L, TimeUnit.SECONDS)
                    .subscribeOn(Schedulers.io())
            }
            Completable.mergeDelayError(completables)
        }

但是,这种实现可能会因为一次执行太多事情而使网络和/或线程数不堪重负。因此,我想知道最好的方法是限制一次运行的completables 的数量。

我知道Completable#mergeDelayError(Publisher&lt;? extends CompletableSource&gt; sources, int maxConcurrency),但不确定如何将我的List&lt;Completable&gt; 转换为所需的Publisher。另一种解决方案是提供具有最大线程数的自定义Scheduler,但我也不确定如何提供这样的Schduler(我可以在不再需要时清理并丢弃)。

【问题讨论】:

  • 使用 Flowable.fromIterable 进行转换。

标签: java kotlin rx-java reactive-programming rx-java2


【解决方案1】:

最简单的方法是使用Floable.fromIterableCompletableList 转换为Publisher

这将允许使用Completable#mergeDelayError(Publisher&lt;? extends CompletableSource&gt; sources, int maxConcurrency)

【讨论】:

    【解决方案2】:

    您可以使用 flatMapma​​xConcurrent 值,然后让您的管道异步运行。

      @Test
        public void asyncFlatMapWithMaxConcurrent() {
            Observable.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
                    .flatMap(value -> Observable.just(value)
                            .map(number -> {
                                try {
                                    Thread.sleep(1000);
                                    System.out.println(String.format("Value %s in Thread execution:%s",number, Thread.currentThread().getName()));
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                return number;
                            }).subscribeOn(Schedulers.newThread())
                            , 2)//This is the max concurrenrt
            .subscribe();
            new TestSubscriber()
                    .awaitTerminalEvent(15, TimeUnit.SECONDS);    }
    

    如果你看到 flatMap 函数之后的第二个参数,我们传递一个值 2,它是可以为该 flatMap 运行的最大并发线程数

    您可以在此处查看示例和更多内容。

    https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/transforming/ObservableFlatMap.java

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2022-01-23
      • 1970-01-01
      • 2017-07-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多