【问题标题】:Default ForkJoinPool executor taking long time默认 ForkJoinPool 执行程序需要很长时间
【发布时间】:2018-01-09 15:55:51
【问题描述】:

我正在使用 CompletableFuture 异步执行从列表源生成的流。

所以我正在测试重载方法,即 CompletableFuture 的“supplyAsync”,其中一种方法仅采用单个供应商参数,而另一种采用供应商参数和执行器参数。 这是两者的文档:

一个

supplyAsync(供应商供应商)

返回一个新的 CompletableFuture,它由在 ForkJoinPool.commonPool() 中运行的任务异步完成,其值通过调用给定的供应商获得。

supplyAsync(Supplier 供应商,Executor 执行者)

返回一个新的 CompletableFuture,它由在给定执行程序中运行的任务异步完成,其值通过调用给定供应商获得。

这是我的测试课:

public class TestCompleteableAndParallelStream {

    public static void main(String[] args) {
        List<MyTask> tasks = IntStream.range(0, 10)
                .mapToObj(i -> new MyTask(1))
                .collect(Collectors.toList());
        
        useCompletableFuture(tasks);
        
        useCompletableFutureWithExecutor(tasks);

    }
    
    public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
          long start = System.nanoTime();
          ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
          List<CompletableFuture<Integer>> futures =
              tasks.stream()
                   .map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
                   .collect(Collectors.toList());
         
          List<Integer> result =
              futures.stream()
                     .map(CompletableFuture::join)
                     .collect(Collectors.toList());
          long duration = (System.nanoTime() - start) / 1_000_000;
          System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
          System.out.println(result);
          executor.shutdown();
        }
    
    public static void useCompletableFuture(List<MyTask> tasks) {
          long start = System.nanoTime();
          List<CompletableFuture<Integer>> futures =
              tasks.stream()
                   .map(t -> CompletableFuture.supplyAsync(() -> t.calculate()))
                   .collect(Collectors.toList());
         
          List<Integer> result =
              futures.stream()
                     .map(CompletableFuture::join)
                     .collect(Collectors.toList());
          long duration = (System.nanoTime() - start) / 1_000_000;
          System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
          System.out.println(result);
        }
    
    

}


class MyTask {
      private final int duration;
      public MyTask(int duration) {
        this.duration = duration;
      }
      public int calculate() {
        System.out.println(Thread.currentThread().getName());
        try {
          Thread.sleep(duration * 1000);
        } catch (final InterruptedException e) {
          throw new RuntimeException(e);
        }
        return duration;
      }
    }

“useCompletableFuture”方法大约需要 4 秒才能完成,而“useCompletableFutureWithExecutor”方法只需 1 秒即可完成。

不,我的问题是,ForkJoinPool.commonPool() 有什么不同的处理可以做开销?那么我们不应该总是更喜欢自定义执行器池而不是 ForkJoinPool 吗?

【问题讨论】:

    标签: java-8 executorservice threadpoolexecutor completable-future forkjoinpool


    【解决方案1】:

    检查ForkJoinPool.commonPool() 大小。默认情况下,它会创建一个大小为

    的池
    Runtime.getRuntime().availableProcessors() - 1
    

    我在我的 Intel i7-4800MQ(4 个核心 + 4 个虚拟核心)上运行您的示例,在我的情况下,公共池的大小为 7,因此整个计算耗时约 2000 毫秒:

    ForkJoinPool.commonPool-worker-1
    ForkJoinPool.commonPool-worker-4
    ForkJoinPool.commonPool-worker-2
    ForkJoinPool.commonPool-worker-6
    ForkJoinPool.commonPool-worker-5
    ForkJoinPool.commonPool-worker-3
    ForkJoinPool.commonPool-worker-7
    ForkJoinPool.commonPool-worker-4
    ForkJoinPool.commonPool-worker-2
    ForkJoinPool.commonPool-worker-1
    Processed 10 tasks in 2005 millis
    [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
    

    在第二种情况下你使用

    Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
    

    所以池中有 10 个线程准备好执行计算,所以所有任务都在 ~1000 毫秒内运行:

    pool-1-thread-1
    pool-1-thread-2
    pool-1-thread-3
    pool-1-thread-4
    pool-1-thread-5
    pool-1-thread-6
    pool-1-thread-7
    pool-1-thread-8
    pool-1-thread-9
    pool-1-thread-10
    Processed 10 tasks in 1002 millis
    [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
    

    ForkJoinPoolExecutorService 之间的区别

    Eugene 在他的评论中还提到了一件更重要的事情。 ForkJoinPool 使用工作窃取方法:

    ForkJoinPool 与其他类型的 ExecutorService 的不同之处主要在于采用了工作窃取:池中的所有线程都尝试查找并执行提交到池和/或由其他活动任务创建的任务(最终阻塞等待如果不存在则工作)。当大多数任务产生其他子任务(大多数 ForkJoinTasks 也是如此)时,以及当许多小任务从外部客户端提交到池时,这可以实现高效处理。尤其是在构造函数中将 asyncMode 设置为 true 时,ForkJoinPools 也可能适用于从未加入的事件式任务。

    而使用.newFixedThreadPool() 创建的ExecutorService 使用分而治之的方法。

    如何确定池大小?

    有一个关于什么是最佳线程池大小的问题,您可以在那里找到有用的信息:

    Setting Ideal size of Thread Pool

    这个帖子也是一个调查的好地方:

    Custom thread pool in Java 8 parallel stream

    【讨论】:

    • @SzymonStepniak 你实际上需要对虚拟机进行相当好的预热才能对速度做出任何理智的结论,而这段代码没有。还有 more 线程然后实际 CPU(甚至是虚拟的)是不好的。
    • 不能再同意了。我的回答仅限于解释为什么 KayV 看到 commonPool 与 10 个线程的固定大小池的行为差异。我不建议使用比 CPU 数量更多的线程。
    • @SzymonStepniak 这些池的工作原理也是完全不同的——一个是分工(分而治之),另一个是窃取——完全不同的实现
    • 必须强调的是,在这个由Thread.sleep 组成的人为示例中,使用比内核数更多的线程有很大帮​​助。对于实际任务,公共池的默认并行度可能更合理。工作窃取在这里无关紧要,因为最后,工作线程在任何一种情况下都只处理排队的任务。在这里都没有使用“分而治之的方法”。
    • @Eugene:我建议你重新阅读那里的帖子。首先,“偷工减料”和“分而治之”并不是矛盾的东西,事实上,两者通常都归于 F/J 框架。由于其他线程池执行器,例如通过newFixedThreadPool() 构造的线程池执行器只有一个队列,它们总是会做某种“工作窃取”,尽管这样称呼它没有多大意义,因为你需要本地排队称其为实际偷窃。另一方面,“分而治之”是一种解决问题的策略,您可以在任何 executor 上实施,但最好在 F/J 上实施。
    【解决方案2】:

    进一步检查互联网上的解决方案,我发现我们可以使用以下属性更改 ForkJoinPool 采用的默认池大小:

    -Djava.util.concurrent.ForkJoinPool.common.parallelism=16
    

    因此,此属性可以进一步帮助以更有效的方式和更多的并行性来使用 ForkJoinPool。

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-09-19
    • 1970-01-01
    • 2012-12-03
    • 2011-03-14
    • 2021-01-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多