【问题标题】:collecting from parallel stream in java 8从java 8中的并行流中收集
【发布时间】:2017-05-20 08:07:41
【问题描述】:

我想接受一个输入并在其上应用并行流,然后我想输出为列表。输入可以是我们可以应用流的任何列表或任何集合。

我担心的是,如果我们想要输出作为映射它们,我们有一个来自 java 的选项,就像

list.parallelStream().collect(Collectors.toConcurrentMap(args))

但是我看不到以线程安全的方式从并行流中收集以提供列表作为输出的选项。 我在那里看到了另一个使用选项

list.parallelStream().collect(Collectors.toCollection(<Concurrent Implementation>))

这样我们就可以在collect方法中提供各种并发实现。但我认为 java.util.concurrent 中只有 CopyOnWriteArrayList List 实现。我们可以在这里使用各种队列实现,但不会像列表那样。我的意思是我们可以通过变通方法来获取列表。

如果我希望输出为列表,您能否指导我最好的方法是什么?

注意:我找不到与此相关的任何其他帖子,任何参考都会有所帮助。

【问题讨论】:

  • 你不能给它一个Collections.synchronizedList(new ArrayList<>())吗?
  • @OleV.V.没必要。集合对象不需要是线程安全的。

标签: java java-8 java-stream java.util.concurrent


【解决方案1】:

用于接收正在收集的数据的Collection 对象不需要是并发的。你可以给它一个简单的ArrayList

这是因为从并行流中收集的值实际上并未收集到单个 Collection 对象中。每个线程将收集自己的数据,然后所有子结果将合并到一个最终的Collection 对象中。

这在Collector javadoc 中有详细记录,Collector 是您提供给collect() 方法的参数:

<R,A> R collect(Collector<? super T,A,R> collector)

【讨论】:

  • 我想我错过了那部分。我最初的理解是,我们正在传递的集合只会作为单个集合。但我现在的问题是,为什么我们需要Collectors.toConcurrentMap 他们可以使用简单的哈希映射,然后组合,返回。
  • @VipulGoyal 这显然是出于优化目的。合并大的HashMaps 可能会非常昂贵,而ConcurrentHashMap 在他们实现流时已经存在,那么为什么不直接使用它呢?
  • @Eugene 我同意你的观点,合并HashMaps 的成本很高。但我现在想的是,为什么我们没有更好的并发列表实现,而不是CopyOnWriteArrayList ,这非常昂贵。那里的挑战是什么,或者我错过了什么?无论如何,我得到的答案是不同的讨论。
  • @VipulGoyal 如果流(输入)和集合(输出)都是有序的,并发集合将无济于事,因为必须按顺序收集值。但是,如果不需要维护顺序,并且集合是并发的,那么所有并行线程都可以添加到单个结果集合中,而不是通过构建需要合并的中间子结果。
  • @Vipul Goyal:合并两个HashMaps 意味着重新散列其中一个地图的所有条目。相反,合并两个ArrayLists 意味着一个单一的普通内存传输。此外,请记住Collectors.toList() 没有指定返回ArrayList,甚至不是可变列表。所以未来的版本可能会返回一个不同的 List 实现,在构建时更容易合并,但之后无法修改......
【解决方案2】:

But there is no option that I can see to collect from parallel stream in thread safe way to provide list as output。这是完全错误的。

流的全部意义在于,您可以使用非线程安全的集合来实现完全有效的线程安全结果。这是因为流是如何实现的(这是流设计的关键部分)。你可以看到Collector 定义了一个方法supplier,它在每一步都会创建一个新实例。这些实例将在它们之间合并。

所以这是完全线程安全的:

 Stream.of(1,2,3,4).parallel()
          .collect(Collectors.toList());

由于此流中有 4 个元素,因此将创建 4 个 ArrayList 实例,它们将在最后合并为一个结果(假设至少有 4 个 CPU 内核)

另一方面,像toConcurrent 这样的方法会生成一个单一结果容器,所有线程都会将它们的结果放入其中。

【讨论】:

  • …假设至少有四个 CPU 内核。
  • @Holger 我正在努力关注细节,但你的能力远超于此... :) 非常感谢您的评论!
  • 我认为您总体上是对的,但您的推理让我感到困惑。 Collectors.toList 实现确实(如您所说)为正在并行处理的流的每个部分创建一个新的ArrayList,但是合并使用线程不安全的addAll 调用将第二个列表合并到第一个列表中,然后返回第一个列表(而不是创建一个新列表),尽管这仍然是安全的,因为它合并了对,因此一个列表上永远不会有 2 个并发 addAll 调用。
  • @CameronStone 对。这也称为向左折叠。您可以尝试reduce,而无需一直为合并功能创建一个新列表,并看到它同时打破了btw
  • Stream#collect javadoc 中说“如果流是并行的,并且Collector 是并发的,并且...,那么将执行并发减少(参见Collector有关并发减少的详细信息。)”Collectors.toList() 创建一个非并发的Collector 实现。那是什么意思呢?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-08-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多