【问题标题】:How to combine two results of CompletableFuture when one depends on another?当一个依赖于另一个时,如何组合 CompletableFuture 的两个结果?
【发布时间】:2020-11-26 17:24:05
【问题描述】:

有 2 种服务:

  1. Service1 返回 ids1 列表;
  2. Service2 按 id1 返回 ids2 列表:

如何使用CompletableFuture 将结果按id1 -> [id2, id2, id2, ...] 分组?
我想把它全部放到 HashMap 中,但我不明白如何将 ids1 传递给第二个 Future。我正在考虑使用thenCombine(ids1Future, (v1, v2) -> {...}),但这对我来说似乎很丑陋。 有什么“最佳实践”可以做到吗?

这里不是异步方式:

Map<ID1, List<ID2>> idsMap = new HashMap();
      
List<IDS1> ids1List = service1.service.find();
      
for (ID1 id1: IDS1) {
    List<IDS2> ids2List = service2.findById(id1);
    idsMap.put(id1, ids2List);
}
service2.process(idsMap);

尝试异步:

CompletableFuture<List<IDS1>> ids1Future
        = CompletableFuture.supplyAsync(() -> service.find());

ids1Future.thenApply((ids1) -> {
    CompletableFuture<List<IDS2>> listFutureIds2
            = ids1.stream().map(id1
                    -> CompletableFuture.supplyAsync(()
                    -> service2.getById(id1)))
                    .collect(Collectors.toList());

    CompletableFuture<Void> allFuturesDone
            = CompletableFuture.allOf(
                    listFutureIds2.toArray(
                            new CompletableFuture[listFutureIds2.size()]));
    allFuturesDone.thenApply(v
            -> list.stream()
                    .map(CompletableFuture::join)
                    .collect(toList()));

});

【问题讨论】:

  • 你需要的是一个 fork/join 实现docs.oracle.com/javase/tutorial/essential/concurrency/…
  • 你先提供一些符合你描述的代码怎么样? CompletableFuture&lt;List&lt;IDS1&gt;&gt; listFutureIds2 应该是 CompletableFuture&lt;List&lt;IDS2&gt;&gt; listFutureIds2?
  • 我编辑了代码。我希望现在我的意思更清楚了。

标签: java java.util.concurrent completable-future


【解决方案1】:

这是一个如何使用 Fork/Join 将字符列表转换为字符串列表的示例,其中每个字符串是该字符 N 次的复制。您可以根据自己的情况调整此示例。

我希望这就是你要找的。​​p>

import java.util.Collection;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.Collectors;

public class ForkJoinExample {
    
    public static class RepeatTask extends RecursiveTask<List<String>> {

        private final List<Character> chars;
        private final int repeatCount;

        public RepeatTask(List<Character> chars, int repeatCount) {
            this.chars = chars;
            this.repeatCount = repeatCount;
        }

        public RepeatTask(Character c, int repeatCount) {
            this.chars = List.of(c);
            this.repeatCount = repeatCount;
        }

        @Override
        protected List<String> compute() {
            if (chars.size() > 1) {
                ForkJoinTask.invokeAll(chars.stream().map(c -> new RepeatTask(c, repeatCount)).collect(Collectors.toList()));
                return ForkJoinTask.invokeAll(chars.stream().map(c -> new RepeatTask(c, repeatCount)).collect(Collectors.toList()))
                        .stream()
                        .map(ForkJoinTask::join)
                        .flatMap(Collection::stream)
                        .collect(Collectors.toList());
            }
            return List.of(String.valueOf(chars.get(0)).repeat(Math.max(0, repeatCount)));
        }

    }

    public static void main(String[] args) {
        ForkJoinPool commonPool = ForkJoinPool.commonPool();
        RepeatTask task = new RepeatTask(List.of('a', 'b', 'c'), 4);
        commonPool.execute(task);
        System.out.println(task.join());
    }
}

结果是这个:[aaaa, bbbb, cccc]

【讨论】:

  • 这很有帮助,但有没有办法使用 CompletableFuture 做到这一点?
  • 这如何回答这个问题?
  • @Eugene 这是 fork/join 的经典案例。您有一个元素列表,您为每个元素拆分工作,然后加入结果。显然,问题可以通过多种方式解决,其中之一就是您的解决方案。我的解决方案的一个优点是,如果 service2 能够返回许多 id 的结果列表(例如优化网络流量),那么这个解决方案仍然适用。
【解决方案2】:

希望我理解正确,所以这里是:

   CompletableFuture<List<IDS1>> ids1Future = CompletableFuture.supplyAsync(service::find);

   CompletableFuture<Map<IDS1, List<IDS2>>> map = ids1Future.thenApply(listIDS1 -> {
        List<Entry<IDS1, CompletableFuture<List<IDS2>>>> list =
            listIDS1
                .stream()
                .map(ids1 -> new SimpleEntry<>(ids1, CompletableFuture.supplyAsync(() -> service2.getById(ids1))))
                .collect(Collectors.toList());

        return list.stream()
                   .map(entry -> new SimpleEntry<>(entry.getKey(), entry.getValue().join()))
                   .collect(Collectors.toMap(Entry::getKey, Entry::getValue));

    });

    Map<IDS1, List<IDS2>> result = map.join();

【讨论】:

  • 好像。我要检查一下。很快就会回来)
  • @poena_cullei 有帮助吗?
猜你喜欢
  • 2016-07-18
  • 1970-01-01
  • 2021-09-18
  • 1970-01-01
  • 1970-01-01
  • 2015-04-09
  • 1970-01-01
  • 2017-02-19
  • 1970-01-01
相关资源
最近更新 更多