【问题标题】:Combine the result of a list of parallel tasks to a single HashMap using RxJava使用 RxJava 将并行任务列表的结果组合到单个 HashMap
【发布时间】:2016-11-27 11:22:43
【问题描述】:

我有一个List<Task>,其中Task 是一个带有返回Map<String, JsonElement> 的单一方法的接口。如何并行执行List<Task> 并返回一个新的HashMap 以及每个Task 的组合结果?

我目前有这个:

List<Task> tasks = getTasks();

Observable.from(tasks)
    .flatMap(new Func1<Task, Observable<Map<String, JsonElement>>>() {
        @Override
        public Observable<Map<String, JsonElement>> call(Task task) {
            return Observable.just(task.get());
        }
    });

// group into single Map<String,JsonElement>
// create Observable<Map<String,JsonElement>> with all results

【问题讨论】:

  • 您希望通过什么值将您的Task 组合/分组到新的HashMap 中?
  • 每个 Task.get() 的结果应该使用map.putAll() 合并到新的HashMap

标签: java rx-java reactive-programming rx-android


【解决方案1】:

使用defer 封装每个订阅的Map 和一个基于您想要的线程池大小的Scheduler

Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(5));
Observable.defer(() -> {
  final Map<String, JsonElement> map = new ConcurrentHashMap<>();
  return Observable
    .from(tasks)
    .flatMap(task -> 
      Observable
        .fromCallable(task -> task.get())
        .doOnNext(mp -> map.putAll(mp)) 
        .subscribeOn(scheduler))
    .ignoreElements()
    .concatWith(Observable.just(map));
});

请注意,Scheduler 的选择将取决于正在执行的任务的性质。如果 Schedulers.computation() 是 CPU 控制的,您可能会对它感到满意。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-10-12
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多