【问题标题】:Do stream operations in a parallel stream wait untill the previous stream operation has processed all elements并行流中的流操作等到前一个流操作处理完所有元素
【发布时间】:2020-01-19 16:53:43
【问题描述】:

我想知道并行流的流管道中的流操作是否等待前一个流操作完成所有流元素的处理。

如果我有以下流管道:

List <String> parsedNumbers = IntStream.range(1, 6)
    .parallel()
    .map(String::valueOf)
    .map(integerAsString => {
        System.out.println("First print statement: " + integerAsString);
        return integerAsString;
    })
    .map(integerAsString => {
        System.out.println("Second print statement: " + integerAsString);
        return integerAsString;
    })
    .collect(Collectors.toList());

会不会已经为元素 X 调用了 System.out.println("First print statement: " + integerAsString),但仍在为流中的另一个元素 Y 执行 String::parseInt 操作?

能否这段代码的输出如下:

第一个打印语句:1
第一个打印语句:2
第一个打印语句:3
第二次打印声明:1
第二个打印语句:2
第一个打印语句:4
第二个打印语句:3
第二次打印声明:4
第一个打印语句:5
第二个打印语句:5

它会总是是这样的吗:

第一个打印语句:1
第一个打印语句:2
第一个打印语句:3
第一个打印语句:4
第一个打印语句:5
第二次打印声明:1
第二个打印语句:2
第二个打印语句:3
第二次打印声明:4
第二个打印语句:5

【问题讨论】:

  • 为什么这相关? Streams 应该没有副作用,所以不用在意。
  • 不鼓励但不禁止副作用。不过我同意。我只是想知道前几天的订单。
  • 这些操作在顺序流中甚至不能以这种方式工作。除此之外,.map(String::parseInt) 毫无意义。将int 转换为String 不是解析,因此不存在这种方法。
  • parseInt 确实不存在,我的错。我更新了代码以使用正确的方法。

标签: java java-8 parallel-processing java-stream


【解决方案1】:

是的,它可以。 Intermediate 阶段可以按任何顺序执行,terminal 操作具有定义的顺序,如果流的源具有顺序(例如与 Set 不同)并且流本身确实不要改变那个顺序(调用unordered - 虽然目前这并没有多大作用)。

也就是说:您并不真正知道在给定时间点哪个元素将流经某个阶段,对于并行流如何处理元素没有顺序。

更大的问题是为什么你在乎?中间操作应该是无副作用的,依赖任何顺序都是一个坏主意。

【讨论】:

  • 不鼓励但不禁止副作用。不过我同意。我只是想知道前几天的订单。
  • @Titulum 确切地说,我所知道的最著名的副作用是 distinctBy
【解决方案2】:

无法保证处理顺序,即使对于顺序流也是如此。只有最终结果才会与遭遇顺序一致,如果数据有的话。

当你运行以下顺序代码时

List<String> parsedNumbers = IntStream.range(1, 6)
    .mapToObj(String::valueOf)
    .map(integerAsString -> {
        System.out.println("First print statement: " + integerAsString);
        return integerAsString;
    })
    .map(integerAsString -> {
        System.out.println("Second print statement: " + integerAsString);
        return integerAsString;
    })
    .collect(Collectors.toList());

它会打印出来

First print statement: 1
Second print statement: 1
First print statement: 2
Second print statement: 2
First print statement: 3
Second print statement: 3
First print statement: 4
Second print statement: 4
First print statement: 5
Second print statement: 5

显示流不像您期望的那样工作。参考实现明显倾向于在处理下一个元素之前将每个元素传递给整个流。当您启用并行处理时,将在每个 CPU 内核上执行相同的处理逻辑。

所以当我使用

List<String> parsedNumbers = IntStream.range(1, 6)
    .parallel()
    .mapToObj(String::valueOf)
    .map(integerAsString -> {
        System.out.println("First print statement: " + integerAsString);
        return integerAsString;
    })
    .map(integerAsString -> {
        System.out.println("Second print statement: " + integerAsString);
        return integerAsString;
    })
    .collect(Collectors.toList());

我的机器上有这样的东西:

First print statement: 5
First print statement: 2
First print statement: 1
First print statement: 4
First print statement: 3
Second print statement: 5
Second print statement: 2
Second print statement: 1
Second print statement: 4
Second print statement: 3

这可能看起来像是在第二个阶段之前将第一个打印语句处理为一个阶段,但这只是 CPU 核心多于流元素和一个幸运的时机的巧合。例如,当我将 range(1, 6) 更改为 range(1, 18) 时,我会得到类似

First print statement: 6
First print statement: 10
First print statement: 9
First print statement: 3
First print statement: 15
First print statement: 5
Second print statement: 9
First print statement: 11
First print statement: 8
Second print statement: 3
Second print statement: 11
Second print statement: 5
Second print statement: 10
Second print statement: 6
First print statement: 7
First print statement: 12
Second print statement: 8
Second print statement: 15
Second print statement: 12
Second print statement: 7
First print statement: 2
First print statement: 17
First print statement: 14
First print statement: 4
Second print statement: 14
Second print statement: 17
Second print statement: 2
First print statement: 1
First print statement: 16
First print statement: 13
Second print statement: 16
Second print statement: 1
Second print statement: 4
Second print statement: 13

不仅不保证处理的顺序,也不保证将处理哪些元素,例如

IntStream.range(1, 30)
    .filter(i -> i%13 == 1)
    .peek(i -> System.out.println("processing "+i))
    .parallel()
    .findFirst()
    .ifPresent(i -> System.out.println("result is "+i));

在我的设置中产生

processing 14
processing 1
processing 27
result is 1

因此,虽然结果保证为1,即遇到顺序中的第一个匹配元素,但不能保证不处理按遇到顺序跟随它的其他元素。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-11-28
    • 2016-06-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多