【问题标题】:How to reduce # of threads in Java parallelStream?如何减少 Java parallelStream 中的线程数?
【发布时间】:2016-01-06 11:11:36
【问题描述】:

我有一个带有parallelStream() 的测试代码,它向服务器机器发送请求。

Report report = 
    requestsList.parallelStream()
                .map(request -> freshResultsGenerator.getResponse(request, e2EResultLongBL))
                .map(response -> resultsComparer.compareToBl(response, e2EResultLongBL,
                            astarHistogramsArrayBl, latencyHistogramBl))
                .reduce(null,
                        (sumReport, compare2) ->
                        {
                            if (sumReport == null) {
                                sumReport = new Report();
                            }
                            sumReport.add(compare2);
                            return sumReport;
                        },
                        (report1, report2) ->
                        {
                            Report report3 = new Report();
                            report3.add(report1);
                            report3.add(report2);
                            return report3;
                        });

这台机器的负载太大,很快它就会返回 HTTP 404 错误。

有两件事我在 Google 上没有找到答案:

  1. 如果没有自定义,parallelStream 的默认线程数是多少 放?
  2. 如何将工作线程数设置为 4?

【问题讨论】:

    标签: java multithreading lambda parallel-processing java-stream


    【解决方案1】:

    Stream API 使用ForkJoinPool 来执行并发任务。引用其文档:

    默认情况下,公共池是使用默认参数构造的,但可以通过设置三个系统属性来控制这些参数:

    • java.util.concurrent.ForkJoinPool.common.parallelism - 并行度,非负整数
      ...

    还有

    ForkJoinPool 可以用给定的目标并行度级别构造;默认情况下,等于可用处理器的数量。

    所以要自定义线程数,可以将系统属性java.util.concurrent.ForkJoinPool.common.parallelism设置为你想要的值:

    System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4")
    

    将工作线程数设置为 4。默认情况下,线程数将等于您拥有的处理器数。

    【讨论】:

    • 更改属性最好更改默认值。如果您需要使用专用的 ForkJoinPool 为不同的流提供不同的值,那将是更好的解决方案。
    【解决方案2】:

    ParallelStream 使用 ForkJoinPool.commonPool() 初始化为您的核心数 - 1

    可以按照here 的描述将您自己的ForkJoin Executor 传递给parallelStream,但执行器必须是ForkJoin,这不是IO 绑定任务的最佳选择。

    【讨论】:

    • 谢谢。为什么“这不是 IO 绑定任务的最佳选择。”?
    • @EladBenda 任何分叉加入执行者
    【解决方案3】:

    您可以在自定义 ForkJoinPool 中启动您的管道,它将用于获取结果:

    ForkJoinPool fjp = new ForkJoinPool(2);
    System.out.println("My pool: " + fjp);
    String result = CompletableFuture.supplyAsync(
        () -> Stream.of("a", "b", "c").parallel()
        .peek(x -> System.out.println(
            ((ForkJoinWorkerThread) Thread.currentThread()).getPool()))
        .collect(Collectors.joining()), fjp).join();
    System.out.println(result);
    

    我的StreamEx 库添加了一个语法糖方法.parallel(fjp) 以使其更简单:

    String result = StreamEx.of("a", "b", "c")
        .parallel(fjp)
        .peek(x -> System.out.println(
            ((ForkJoinWorkerThread) Thread.currentThread()).getPool()))
        .collect(Collectors.joining());
    

    【讨论】:

      猜你喜欢
      • 2018-11-24
      • 2015-08-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多