累加值的过程可以在collector内部处理。
在这种情况下,不需要将 当前值 存储在流管道之外并通过 副作用 对其进行更新,而API documentation.
自定义收集器
为此,我们需要定义一个自定义收集器。可以实现为实现Collector接口的类,也可以使用静态方法Collector.of()。
这些是Collector.of() 期望的参数:
-
Supplier Supplier<A> 旨在提供一个存储流元素的可变容器。在这种情况下,ArrayDeque(作为Deque 接口的实现)将作为一个容器方便地访问之前添加的元素。
-
Accumulator BiConsumer<A,T> 定义了如何将元素添加到 supplier 提供的 container 中。在累加器中,我们需要确保 deque 在访问最后一个元素之前不为空。 注意: 下面提供的解决方案中的对被视为不可变的(我已将其重新实现为 record),因此第一对是按原样使用,其他人将被恢复。
-
Combiner BinaryOperator<A> combiner() 建立了一个规则,用于合并并行执行流时获得的两个容器。该任务本质上可以是顺序的,将其拆分为子任务并并行执行是没有意义的。因此,combiner 被实现为在并行执行的情况下抛出 AssertionError。
-
Finisher Function<A,R> 旨在通过转换可变容器来产生最终结果。下面代码中的 finisher 函数将 container(包含结果的 deque)变成了一个不可变列表 .
-
特征 允许提供附加信息,例如Collector.Characteristics.UNORDERED,在这种情况下使用表示并行执行时产生的部分归约结果的顺序并不重要。这个收集器不需要任何特征。
实施
public static List<NameValuePair> accumulateValues(List<NameValuePair> pairs) {
return pairs.stream()
.collect(getPairAccumulator());
}
public static Collector<NameValuePair, ?, List<NameValuePair>> getPairAccumulator() {
return Collector.of(
ArrayDeque::new, // mutable container
(Deque<NameValuePair> deque, NameValuePair pair) -> {
if (deque.isEmpty()) deque.add(pair);
else deque.add(new NameValuePair(pair.name(), deque.getLast().value() + pair.value()));
},
(left, right) -> { throw new AssertionError("should not be executed in parallel"); }, // combiner - function responsible
(Deque<NameValuePair> deque) -> deque.stream().toList() // finisher function
);
}
如果您使用的是 Java 16 或更高版本,则可以将 NameValuePair 实现为 record:
public record NameValuePair(String name, int value) {}
main()
public static void main(String[] args) {
List<NameValuePair> pairs =
List.of(new NameValuePair("a", 2), new NameValuePair("b", 12),
new NameValuePair("c", 15), new NameValuePair("d", 20));
List<NameValuePair> result = accumulateValues(pairs);
result.forEach(System.out::println);
}
输出:
NameValuePair[name=a, value=2]
NameValuePair[name=b, value=14]
NameValuePair[name=c, value=29]
NameValuePair[name=d, value=49]
A link to Online Demo