【问题标题】:CompletableFuture allof(..).join() vs CompletableFuture.join()CompletableFuture allof(..).join() 与 CompletableFuture.join()
【发布时间】:2019-02-21 13:32:17
【问题描述】:

我目前正在使用 CompletableFuture supplyAsync() 方法将一些任务提交到公共线程池。这是代码 sn-p 的样子:

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).join();

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.getNow())
        .forEach(tests::addAll);

我想知道下面的代码与上面的代码有何不同。我从下面的代码中删除了父 completableFuture,并为每个 completableFuture 添加了 join() 而不是 getNow():

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.join())
        .forEach(tests::addAll);

我在 spring 服务中使用它,存在线程池耗尽的问题。任何指针都非常感谢。

【问题讨论】:

  • 为什么在第二个例子中使用join?你可以做List&lt;Test&gt; tests = futures.stream().map(CompletableFuture::get).collect(toList())
  • @daniu 相反,你不能这样做,因为get 被声明为抛出一个检查异常。
  • 据我了解 get vs join 是关于它抛出的异常。检查与未检查。我不确定使用加入是否存在任何其他问题。谢谢指点
  • myThreadFactory.getExecutorService() - 它需要一个现有的线程池还是创建新的?如果创建,则说明线程池耗尽。
  • 两种变体在线程耗尽方面是等效的。原因可能在 task.doWork() 的代码中 - 如果它运行缓慢并且任务太多。只需使用固定线程池即可。

标签: java spring multithreading threadpool completable-future


【解决方案1】:

首先,.getNow() 不起作用,因为此方法需要一个备用值作为未来尚未完成的情况的参数。既然你假设未来在这里完成,你也应该使用join()

然后,线程耗尽没有区别,因为在任何一种情况下,您都在等待所有作业完成后再继续,可能会阻塞当前线程。

避免这种情况的唯一方法是重构代码以不同步预期结果,而是在所有作业都完成后安排后续处理操作完成。然后,使用allOf 变得相关:

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
    .map(resolver -> supplyAsync(() -> task.doWork()))
    .collect(toList());

CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
    .thenAccept(justVoid -> {
        // here, all jobs have been completed
        final List<Test> tests = completableFutures.stream()
            .flatMap(completableFuture -> completableFuture.join().stream())
            .collect(toList());
        // process the result here
    });

顺便说一下,关于集合的toArray方法,我推荐阅读Arrays of Wisdom of the Ancients...

【讨论】:

  • 澄清你的第一段。关于 allOf 方法,我读到它说它并行执行所有线程并将等待所有线程完成。所以我希望所有线程都在那里完成,而且我正在使用 CompletableFuture.allOf(...).join() 并因此询问 allOf 版本。在您使用 thenAccept 的示例中,这意味着当前线程将在 allOf 方法之后继续其工作?如果无法实现案例同步结果,那么我需要查看调整池大小以使其同步,因为这是我需要的行为?有更好的选择吗?线程池执行器?
  • allOf 不会影响期货如何完成。它所做的只是创建一个新的未来,当数组中的所有未来都完成时,如果它们是同步的,它们的完成已经在进行中。对allOf 返回的future 调用join 就像对每个future 调用join。没有不同。 join 块,这是基本的,不可避免的一点。如果在 F/J 工作线程中调用它,F/J 池将启动一个补偿线程,以确保配置的目标并行度。但是如果过度调用join,结果可能是灾难性的。
猜你喜欢
  • 1970-01-01
  • 2018-01-11
  • 2023-03-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-04-04
  • 2016-06-08
相关资源
最近更新 更多