【问题标题】:How to use CompletableFuture to use result of first Callable task as arg to all subsequent Callable tasks?如何使用 CompletableFuture 将第一个 Callable 任务的结果用作所有后续 Callable 任务的 arg?
【发布时间】:2021-03-09 19:13:17
【问题描述】:

如何使用 CompletableFuture 将第一个 Callable 任务的结果用作所有后续 Callable 任务的 arg?我有 3 个任务需要像这样运行:

  1. 第一个阻塞任务运行并返回一个值
  2. 第二个和第三个任务使用从第一个任务提供的参数异步运行并返回值。
  3. 所有 3 个值汇总为所有值的最终结果。

我尝试在下面执行此操作,但我被困在 .thenApply 子句上。

我无法让这段代码正常工作。在.thenApply 子句中,如何从返回的对象响应中传递参数?

import com.google.common.util.concurrent.Uninterruptibles;
import java.util.concurrent.*;
public class ThreadPoolTest {

    static ExecutorService threadPool = Executors.newFixedThreadPool(10);
    
    public static void main(String[] args) {
        CompletableFuture<SumCalculator> cf =
          CompletableFuture.supplyAsync(() -> new SumCalculator(100000), threadPool);
        Integer initialResult = cf.getNow(null).call();
        CompletableFuture<SumCalculator> cf2 = CompletableFuture.completedFuture(initialResult)
        .thenApplyAsync((i) -> new SumCalculator(i));
     //  i want to call 2 or more SumCalulator tasks here

        System.out.println("DONE? "  + cf2.isDone());
        System.out.println("message? " + cf2.getNow(null).call());
        threadPool.shutdown();
        System.out.println("Program exit.");
    }

    public static class SumCalculator implements Callable<Integer> {
        private int n;
        public SumCalculator(int n) {
            this.n = n;
        }
        public Integer call() {
            int sum = 0;
            for (int i = 1; i <= n; i++) {
                sum += i;
            }
            Uninterruptibles.sleepUninterruptibly(800, TimeUnit.MILLISECONDS);
            return sum;
        }
    }

}

注意:我确实想在 Futures 的末尾收集所有 3 个任务的响应作为组合结果列表,也许作为整数值流?在这种情况下,我想对这些值求和。我想这样做是为了提高多线程的性能。

【问题讨论】:

  • 执行这段代码时会发生什么?
  • 如果第一个计算必须在第二个和第三个可以继续之前完成,那么你不能同时开始它们。我不确定我明白你在问什么。
  • 我正要写一个答案,然后你开始发送垃圾邮件编辑并每隔几分钟更改一次问题。现在它甚至包含多个不一致的地方。
  • 我想我已经完成了。试图简化因为另一个人说他们不理解这个问题。
  • @djangofan 是的,我知道。但是然后在一个(!)大编辑中完成。您甚至可以删除和取消删除问题并在中间进行编辑。甚至更好:在点击帖子之前写一个好的、易于理解的、没有前后矛盾的、明确的问题。

标签: java executorservice completable-future threadpoolexecutor


【解决方案1】:

如果我理解正确:

    CompletableFuture<Integer> one =
            CompletableFuture.supplyAsync(() -> new SumCalculator(100000).call(), threadPool);

    CompletableFuture<Integer> two = one.thenApplyAsync(x -> new SumCalculator(x).call(), threadPool);
    CompletableFuture<Integer> three = one.thenApplyAsync(x -> new SumCalculator(x).call(), threadPool);

    Integer result = one.join() + two.join() + three.join();
    System.out.println(result);

【讨论】:

  • CompletableFuture.allOf(one, two, three); 是干什么用的? joins 已经被阻塞了。
  • 这似乎是我正在寻找的。让我试试……
  • @akuzminykh 我的错,在手机上编译(和思考)代码有点奇怪;确实不需要。
  • 看起来它对我有用。谢谢尤金
  • 我想通了:只是没有正确标记泛型类型。感谢您的帮助!
猜你喜欢
  • 1970-01-01
  • 2013-12-01
  • 1970-01-01
  • 2011-12-16
  • 2017-03-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-03-24
相关资源
最近更新 更多