【问题标题】:Synchronization issues with parallel streams on reduce operationsreduce 操作中并行流的同步问题
【发布时间】:2020-03-10 10:37:43
【问题描述】:

我正在尝试使用并行流连接字符串。

StringBuffer concat = Arrays.stream(grades)
        .parallel()
        .reduce(
                new StringBuffer(),
                (sb, s) -> sb.append(s),
                (sb1, sb2) -> sb1.append(sb2)
        );

即使使用收集器(可变减少)会是更好的方法。我想知道为什么这没有返回正确的结果。

例如List<String> grades = List.of("A", "B");

虽然此管道的非并行版本运行良好。我看到的结果是BABA,而它应该只是AB

我已经在使用StringBuffer,它是线程安全的,而不是StringBuilder

我也发现以下代码存在同样的问题。

List<Integer> ages = people
            .stream()
            .parallel()
            .reduce(
                Collections.synchronizedList(new ArrayList<>()),
                (list, p) -> { list.add(p.getAge()); return list; },
                (list1, list2) -> { list1.addAll(list2) ; return list1; }
            );

这里我也使用了一个同步的集合,所有的方法都是线程安全的。

我在Java docs 上看到了这个

但是,有充分的理由更喜欢 reduce 操作而不是 像上面这样的突变积累。不仅是减少“更多 抽象”——它作为一个整体在流上运行,而不是 单个元素——但正确构造的 reduce 操作是 本质上可并行化,只要用于处理的函数 这些元素是关联的和无状态的。例如,给定一个 我们想要求和的数字流,我们可以这样写:

int sum = numbers.stream().reduce(0, (x,y) -> x+y);   or:

int sum = numbers.stream().reduce(0, Integer::sum);   These reduction operations can run safely in parallel with almost no

修改:

int sum = numbers.parallelStream().reduce(0, Integer::sum);   Reduction parallellizes well because the implementation can operate on

数据的子集并行,然后合并中间 结果得到最终的正确答案。 (即使语言有 “parallel for-each”构造,变异累积方法 仍然需要开发人员提供线程安全更新 共享的累加变量和,以及所需的同步 然后可能会消除并行性带来的任何性能提升。) 使用 reduce() 代替了并行化的所有负担 减少操作,并且该库可以提供高效的并行 无需额外同步即可实现。

据我了解,平行归约是很有可能的。

我在这里遗漏了什么吗?使用线程安全的数据结构还不够吗?

【问题讨论】:

  • 继续阅读该部分,包括后续的“可变归约”主题。顺便说一句,您使用同步列表的代码仍然存在问题。

标签: java multithreading collections java-8 java-stream


【解决方案1】:

当您执行new StringBuffer() 时,您正在创建对单个缓冲区的引用。当您执行.parallel() 时,两个并行流都会传递此引用,从而在同一个可变缓冲区上运行。空缓冲区首先用“B”减少,然后用“A”减少,然后减少到自身,产生“BABA”。

要使用可变结构(例如 StringBuffers)执行类似操作,请尝试使用 .collect() 代替:

StringBuffer concat = Arrays.stream(grades).parallel().collect(
    () -> new StringBuffer(),
    (sb, s) -> sb.append(s),
    (sb1, sb2) -> sb1.append(sb2));

【讨论】:

  • 好的。所以对于collectors,第一个参数是ListSupplier,而不是对List 本身的引用(reduce 就是这种情况)。所以每个线程都会创建一个新容器。我说的对吗?
  • 正确。从关于第一个参数的文档中:“供应商 - 一个创建新结果容器的函数。对于并行执行,此函数可能会被多次调用,并且每次都必须返回一个新值。”
  • 请注意,ABAB 的结果也是可能的,这仍然是StringBuffer 在这种使用方面的稳健性的标志,然而,例如将List 添加到自身的行为完全未指定。乐趣始于两个以上的元素……顺便说一下,collect 的这种(正确)使用消除了同步的需要,因此它也可以与StringBuilder 一起使用。可简写为.collect(StringBuilder::new, StringBuilder::append, StringBuilder::append)
  • @Holger 但是addAll 上的synchronizedList 是线程安全的,对吗?我认为它的行为与StringBuffer 相同。我尝试了 4 个元素的列表,它的行为与 StringBuffer 示例相同。
  • @GeekFactory 线程安全在这里无关紧要。 The specification 说:“如果在操作进行时修改了指定的集合,则此操作的行为未定义。 (这意味着如果指定的集合是这个列表,并且这个列表是非空的,那么这个调用的行为是未定义的。)” 所以给自己添加一个ArrayList 有一个未定义的行为。它可能会在特定环境下通过一种实现表现出特定的行为,但这并不能保证
猜你喜欢
  • 2019-06-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-05-27
相关资源
最近更新 更多