【问题标题】:Java 8 stream combiner never calledJava 8 流组合器从未调用过
【发布时间】:2016-10-26 19:00:26
【问题描述】:

我正在编写一个自定义的 java 8 收集器,它应该计算具有 getValue() 方法的 POJO 的平均值。代码如下:

public static Collector<BoltAggregationData, BigDecimal[], BigDecimal> avgCollector = new Collector<BoltAggregationData, BigDecimal[], BigDecimal>() {

        @Override
        public Supplier<BigDecimal[]> supplier() {
            return () -> {
                BigDecimal[] start = new BigDecimal[2];
                start[0] = BigDecimal.ZERO;
                start[1] = BigDecimal.ZERO;
                return start;
            };
        }

        @Override
        public BiConsumer<BigDecimal[], BoltAggregationData> accumulator() {
            return (a,b) ->  {
                a[0] = a[0].add(b.getValue());
                a[1] = a[1].add(BigDecimal.ONE);
            };
        }

        @Override
        public BinaryOperator<BigDecimal[]> combiner() {
            return (a,b) -> {
                a[0] = a[0].add(b[0]);
                a[1] = a[1].add(b[1]);
                return a;
            };
        }

        @Override
        public Function<BigDecimal[], BigDecimal> finisher() {
            return (a) -> {
                return a[0].divide(a[1], 6 , RoundingMode.HALF_UP);
            };
        }

        private final Set<Characteristics> CHARACTERISTICS = new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED));

        @Override
        public Set<Characteristics> characteristics() {
            return CHARACTERISTICS;
        }

    };

在非并行情况下一切正常。但是,当我使用parallelStream() 时,它有时不起作用。例如,给定从 1 到 10 的值,它会计算(53/9 而不是 55/10)。调试时,调试器永远不会命中 combiner() 函数中的断点。我需要设置某种标志吗?

【问题讨论】:

  • 我确实对两者都投了赞成票,也感谢您的回答:) 我只是发现另一个答案更清楚了。也感谢 EnumSet 的提示。
  • 没关系,我只是注意到您在短时间内接受(或试图接受)两者,所以我只是想消除潜在的混淆。
  • 请注意,有更好的方法可以做到这一点,例如cumulative moving average
  • 附带说明,如果您以任何方式使用匿名类,您仍然可以使用 Collector.of 缩小代码大小...
  • 如果我们有其他改进代码的方法,可以使用数组初始化器:BigDecimal[] start = { BigDecimal.ZERO, BigDecimal.ZERO }; return start;() -&gt; new BigDecimal[] { BigDecimal.ZERO, BigDecimal.ZERO },使用 … -&gt; expression 形式,这也适用于整理器:a -&gt; a[0].divide(a[1], 6 , RoundingMode.HALF_UP)

标签: java java-8 java-stream collectors


【解决方案1】:

嗯,这正是您在指定 Characteristics.CONCURRENT 时所要求的:

表示这个收集器是并发的,意味着结果容器可以支持累加器函数被多个线程与同一个结果容器并发调用。

如果不是这样,就像你的 Collector 一样,你不应该指定那个标志。


作为旁注,new HashSet&lt;Characteristics&gt;(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED)); 在指定特征方面效率很低。你可以使用EnumSet.of(Characteristics.CONCURRENT, Characteristics.UNORDERED)。当您删除错误的并发特性时,您可以使用EnumSet.of(Characteristics.UNORDERED)Collections.singleton(Characteristics.UNORDERED),但HashSet 绝对是矫枉过正。

【讨论】:

    【解决方案2】:

    看起来问题出在 CONCURRENT 特征上,它的作用超出了您的想象:

    表示这个收集器是并发的,意思是 结果容器可以支持累加器功能 与来自多个的相同结果容器同时调用 线程。

    而不是调用组合器,累加器被并发调用,对所有线程使用相同的BigDecimal[] aa 的访问不是原子的,所以会出错:

    Thread1 -> retrieves value of a[0]: 3
    Thread2 -> retrieves value of a[0]: 3
    Thread1 -> adds own value: 3 + 3 = 6
    Thread2 -> adds own value: 3 + 4 = 7
    Thread1 -> writes 6 to a[0]
    Thread2 -> writes 7 to a[0]
    

    a[0] 的值设置为 7 而应该是 10。a[1] 可能会发生同样的事情,因此结果可能会不一致。


    如果您删除 CONCURRENT 特征,则将使用组合器。

    【讨论】:

    • 是否可以使用AtomicInteger(或AtomicLong)代替BigInteger?那么可以使用CONCURRENT 特性吗?
    • @dcsohl 在测试过程中,我发现用synchronized(this){...} 包围累加器中的 to 行也解决了这个问题。但是我的直觉告诉我,不应该强制使用这个特性,而是在结果容器以任何方式支持并发操作的情况下使用。
    • @dcsohl:当然,使累加器函数线程安全可以解决问题,正如CONCURRENT 特性所暗示的那样,该函数是线程安全的。但是,它也表明并发评估相对于局部累积加合并的性能优势,这不是这里的情况(很少出现这种情况)。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-03-04
    • 2018-12-17
    • 1970-01-01
    • 2014-08-03
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多