【问题标题】:What is actual role of combiner in java parallelStream reduce组合器在 java parallelStream reduce 中的实际作用是什么
【发布时间】:2021-02-13 10:57:06
【问题描述】:

我对java流中的reduce方法有基本的了解。但是,组合器在parallelStream 中的作用我并不清楚。在下面的代码 sn-p 中,在第一个块中我使用了一个组合器,而它在第二个块中不存在。但是,两种情况下的结果是相同的。

List<Integer> intarr = Arrays.asList(10,20,30);
        Integer totsum = intarr.stream().reduce(20, (a,b) -> a+b, (a,b) -> a+b);
        System.out.println("total sum: "+totsum);

List<Integer> intarr = Arrays.asList(10,20,30);
        Integer totsum = intarr.stream().reduce(20, (a,b) -> a+b);
        System.out.println("total sum: "+totsum);

据了解,在2参数reduce方法的情况下,累加器是BinaryOperator,在3参数reduce方法的情况下是BiFunction。众所周知,这将有助于类型转换。例如,如果我需要将 int 转换为 double,我可以将我的标识指定为 20.0,combiner 会将其输出为 double 类型。

但是,除了类型转换之外,使用组合器实际上可以获得什么优势?

【问题讨论】:

    标签: java mapreduce java-stream reduce


    【解决方案1】:

    combiner 的重点是处理结果和输入是不同类型的情况,这里不是这种情况。例如,如果你想要一个字符串连接,你可以用reduce 来实现,写

    stream.reduce("", (String str, int i) -> str + i, (String a, String b) -> a + b)
    

    ...在这种情况下,当流parallel缩减时,不同的chunk会分别累加,然后与combiner合并。

    【讨论】:

      【解决方案2】:

      并不是说使用组合器比不使用组合器更“有利”。 reduce 的两个重载用于不同的目的。正如您所确定的,类型转换。

      reduce(T, BinaryOperator&lt;T&gt;),没有组合器,返回一个T - 与流管道中的相同类型的东西。另一方面,reduce(U, BiFunction&lt;U, ? super T, U&gt;, BinaryOperator&lt;U&gt;) 使用组合器返回 U - 与流中的类型不同。

      BigInteger sum = bigIntegerStream.reduce(BigInteger.ZERO, BigInteger::add);
      

      我可以使用无合并器的版本来添加BigIntegers 的流。流将是Stream&lt;BigInteger&gt;,我想要的结果也是BigInteger 类型。我可以使用组合器版本将Stream&lt;Long&gt;BigInteger 相加。请注意流的类型与我想要的结果类型有何不同。

      BigInteger sum = longStream.reduce(BigInteger.ZERO, (bi, l) -> bi.add(BigInteger.valueOf(l)), BigInteger::add);
      

      可以说,您也可以在使用无合并器 reduce 之前完成 map

      当结果类型与流的类型不同时需要组合器的原因是因为reduce 操作可以并行运行。例如,可以将整个流分成几个部分,每个部分并行缩减。

      如果结果类型与流类型T 相同,则使用提供的二元运算符将每个部分缩减为T,并留下一堆Ts。我们可以使用相同的二元运算符进一步将这组Ts 减少为单个T

      如果结果类型U 与流类型U 不同,那么BiFucntion 会将每个部分缩减为U,剩下的就是一堆Us不知道该怎么办,因为BiFunction 只需要UT 并返回U。我们需要一个额外的 BinaryOperator&lt;U&gt; 来帮助我们合并 Us。

      所以如果你想要不同的结果类型,组合器是必要的。


      另外,20 不是有效的标识值。一个有效的身份必须满足

      accumulator.apply(identity, u) == u
      

      对于所有u,对于无合并器版本,以及

      combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)
      

      适用于所有 ut,适用于合并器版本。

      【讨论】:

        【解决方案3】:

        3-arg version 的签名是:

        <U> U reduce(U identity,
                     BiFunction<U,? super T,U> accumulator,
                     BinaryOperator<U> combiner)
        

        它打算在以下一项(或两项)为真时使用:

        • 结果类型 (U) 与流元素类型 (T) 不同。
        • combineraccumulator 不同。

        如果这些都不正确,那么2-arg version 更好(更简单、更容易):

        T reduce(T identity,
                 BinaryOperator<T> accumulator)
        

        对于问题中显示的示例,使用 3-arg 版本没有任何优势。


        具有不同类型的示例很多,例如查看其他答案。

        类型相同但combineraccumulator 的示例适用于- 减号运算符:

        List<Integer> intarr = Arrays.asList(10,20,30);
        
        // Sequential processing doesn't use combiner: totsum = -60
        Integer totsum = intarr.stream().reduce(0, (a,b) -> a - b, (a,b) -> a - b);
        
        // Parallel processing with same combiner does work: totsum = -20
        Integer totsum = intarr.parallelStream().reduce(0, (a,b) -> a - b);
        
        Integer totsum = intarr.parallelStream().reduce(0, (a,b) -> a - b, (a,b) -> a - b);
        
        // Parallel processing requires a different combiner: totsum = -60
        Integer totsum = intarr.parallelStream().reduce(0, (a,b) -> a - b, (a,b) -> a + b);
        

        那是因为通过并行处理,我们为该输入流获得了 3 个线程,所以代码变为:

        thread1Result = accumulator.apply(0/*identity*/, 10);  // = 0 - 10 = -10
        
        thread2Result = accumulator.apply(0/*identity*/, 20);  // = 0 - 20 = -20
        
        thread3Result = accumulator.apply(0/*identity*/, 30);  // = 0 - 30 = -30
        
        // Bad combiner: (a,b) -> a - b
        result = combiner.apply(thread1Result, thread2Result); // = -10 - -20 = +10
        result = combiner.apply(result, thread3Result);        // = +10 - -30 = -20
        
        // Good combiner: (a,b) -> a + b
        result = combiner.apply(thread1Result, thread2Result); // = -10 + -20 = -30
        result = combiner.apply(result, thread3Result);        // = -30 + -30 = -60
        

        【讨论】:

        • "// 并行处理需要不同的组合器:totsum = -20" 应该是-60?
        • @samabcde 将所有复制/粘贴错误都搞砸了。谢谢!!
        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-06-25
        • 2019-05-31
        • 1970-01-01
        • 2017-04-23
        • 2019-11-20
        相关资源
        最近更新 更多