【问题标题】:Parallel stream from a HashSet doesn't run in parallel来自 HashSet 的并行流不并行运行
【发布时间】:2015-05-13 04:31:38
【问题描述】:

我有一些想要并行处理的元素。当我使用List 时,并行性有效。但是,当我使用Set 时,它不会并行运行。

我写了一个代码示例来说明问题:

public static void main(String[] args) {
    ParallelTest test = new ParallelTest();

    List<Integer> list = Arrays.asList(1,2);
    Set<Integer> set = new HashSet<>(list);

    ForkJoinPool forkJoinPool = new ForkJoinPool(4);

    System.out.println("set print");
    try {
        forkJoinPool.submit(() ->
            set.parallelStream().forEach(test::print)
        ).get();
    } catch (Exception e) {
        return;
    }

    System.out.println("\n\nlist print");
    try {
        forkJoinPool.submit(() ->
            list.parallelStream().forEach(test::print)
        ).get();
    } catch (Exception e) {
        return;
    }   
}

private void print(int i){
    System.out.println("start: " + i);
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
    }
    System.out.println("end: " + i);
}

这是我在 Windows 7 上得到的输出

set print
start: 1
end: 1
start: 2
end: 2

list print
start: 2
start: 1
end: 1
end: 2

我们可以看到Set 中的第一个元素必须在处理第二个元素之前完成。对于List,第二个元素在第一个元素结束之前开始。

你能告诉我是什么导致了这个问题,以及如何使用Set 集合来避免它吗?

【问题讨论】:

  • 尝试使用两个以上的元素,例如 10. 个元素或其他东西。 2 的结果太模糊了
  • 当您尝试使用 10 时,您仍然无法并行所有集合元素。而且我需要并行运行所有元素。
  • 无论如何,这是 10 个(有 10 个执行器池)元素的输出设置打印开始:8 开始:0 开始:4 开始:6 开始:2 结束:2 结束:6 结束:4结束:0 开始:1 结束:8 开始:9 开始:5 开始:7 开始:3 结束:3 结束:5 结束:9 结束:7 结束:1 列表打印开始:7 开始:3 开始:0 开始:6开始:9 开始:8 开始:5 开始:4 开始:2 开始:1 结束:0 结束:6 结束:7 结束:9 结束:2 结束:3 结束:8 结束:5 结束:1 结束:4 不是全部设置元素并行运行

标签: java lambda parallel-processing java-8 java-stream


【解决方案1】:

我可以重现您看到的行为,其中并行度与您指定的 fork-join 池并行度的并行度不匹配。在将 fork-join 池并行度设置为 10 并将集合中的元素数量增加到 50 后,我看到基于列表的流的并行度仅上升到 6,而基于集合的流的并行度从未超过2.

但是请注意,这种将任务提交到 fork-join 池以在该池中运行并行流的技术是一种实现“技巧”,并且不能保证正常工作。实际上,用于执行并行流的线程或线程池是未指定的。默认情况下,使用公共的 fork-join 池,但在不同的环境中,最终可能会使用不同的线程池。 (考虑一个应用服务器中的容器。)

java.util.stream.AbstractTask 类中,LEAF_TARGET 字段决定了完成的拆分量,进而决定了可以实现的并行量。该字段的值基于ForkJoinPool.getCommonPoolParallelism(),它当然使用公共池的并行性,而不是碰巧运行任务的任何池。

可以说这是一个错误(请参阅 OpenJDK 问题 JDK-8190974),但是,无论如何,这整个区域都未指定。但是,系统的这个领域肯定需要开发,例如在拆分策略、可用并行量、处理阻塞任务等方面。 JDK 的未来版本可能会解决其中一些问题。

同时,可以通过使用系统属性来控制公共 fork-join 池的并行性。如果将此行添加到程序中,

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");

并且您在公共池中运行流(或者如果您将它们提交到具有足够高并行度集的自己的池),您将观察到更多任务并行运行。

您也可以使用-D 选项在命令行上设置此属性。

同样,这不是保证行为,将来可能会发生变化。但在可预见的未来,这种技术可能会适用于 JDK 8 的实现。

2019-06-12 更新:错误 JDK-8190974 已在 JDK 10 中修复,并且该修复已向后移植到即将发布的 JDK 8u 版本 (8u222)。

【讨论】:

  • @SotiriosDelimanolis 也可以在这里查看 Dimitar 的评论。我看到你们也在other question 上讨论这个问题
  • @DimitarDimitrov 我认为这比你想象的要简单。 “可以说这是一个错误”声明是关于流中的拆分行为。它总是根据公共池的并行性进行拆分。但是,如果流的目标是另一个池(使用未记录的 hack),则拆分仍然受公共池的并行性控制,而不是目标池的并行性。
  • @StuartMarks 是的,你对AbstractTask 的行为是完全正确的,而我关于ForkJoinPool 背压是根本原因的回答是错误的。感谢你们对我的包容-我会相应地更新我的答案。至于@SotiriosDelimanolis 在这里的评论,无论这是否可以归类为错误以及是否应支持使用非默认池,修复此问题可能需要进行更多实质性更改,因为目前流不知道并行度级别它将在其中运行的池。
  • @pppavan 可能是因为 ArrayList 的元素密集打包在一个数组中,所以每次拆分都是满的。 HashSet 的元素相对稀疏地分布在其表中的存储桶中。
  • @d-coder 可能是因为已经创建了 FJ 池。创建后,您无法通过更改属性来更改其大小。尝试将属性设置在main() 的顶部。在main() 创建FJ 池之前,我不认为有任何事情(但这可能因JDK 版本而异)。如果失败,请尝试在命令行上设置属性。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-08-31
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多