【问题标题】:Understanding sequential vs parallel stream spliterators in Java 8 and Java 9了解 Java 8 和 Java 9 中的顺序与并行流拆分器
【发布时间】:2018-03-24 09:10:28
【问题描述】:

一个乍一看并不简单的关于拆分器的问题。

在流中,.parallel() 改变了流处理的行为。但是,我期望从顺序流和并行流创建的拆分器是相同的。例如,通常在顺序流中,.trySplit() 永远不会被调用,而在并行流中,它会被调用,以便将拆分拆分器移交给另一个线程。

stream.spliterator()stream.parallel().spliterator() 之间的差异:

  1. 它们可能具有不同的特征:

    Stream.of(1L, 2L, 3L).limit(2);            // ORDERED
    Stream.of(1L, 2L, 3L).limit(2).parallel(); // SUBSIZED, SIZED, ORDERED
    

这里讨论的似乎是另一个废话流拆分器特征策略(并行计算似乎更好):Understanding deeply spliterator characteristics in java 8 and java 9

  1. 他们在使用.trySplit()进行拆分方面可能有不同的行为:

    Stream.of(1L, 2L, 3L);                     // NON NULL
    Stream.of(1L, 2L, 3L).limit(2);            // NULL
    Stream.of(1L, 2L, 3L).limit(2).parallel(); // NON NULL
    

为什么最后两个有不同的行为?如果我愿意,为什么我不能拆分顺序流? (例如,丢弃其中一个拆分以进行快速处理可能很有用)。

  1. 将拆分器转换为流时的重大影响:

    spliterator = Stream.of(1L, 2L, 3L).limit(2).spliterator();
    stream = StreamSupport.stream(spliterator, true); // No parallel processing!
    

在这种情况下,拆分器是从禁用拆分功能的顺序流创建的(.trySplit() 返回 null)。稍后,需要转换回流时,该流将无法从并行处理中受益。可惜了。

大问题:作为一种解决方法,在调用.spliterator() 之前总是将流转换为并行的主要影响是什么?

// Supports activation of parallel processing later
public static <T> Stream<T> myOperation(Stream<T> stream) {
    boolean isParallel = stream.isParallel();
    Spliterator<T> spliterator = stream.parallel().spliterator();
    return StreamSupport.stream(new Spliterator<T>() {
        // My implementation of the interface here (omitted for clarity)
    }, isParallel).onClose(stream::close);
}

// Now I have the option to use parallel processing when needed:
myOperation(stream).skip(1).parallel()...

【问题讨论】:

  • 期望从顺序流和并行流创建的拆分器是相同的您为什么会这样?您的问题最终不会转变为并行流还是顺序流?
  • 单一责任原则。拆分器只是拆分器,他只知道如何拆分迭代器。流应该具有创建非线程的逻辑。 (这是预期)。
  • 您可能还记得this answer 的 cmets 中已经讨论了这种特定于实现的行为。在那里,Tagir Valeev 给出了有趣的评论:“另一方面,你不能盲目地使用 parallel(),因为这实际上可能会并行执行一些操作(如排序),意外地消耗更多的 CPU 内核”,它解决了您问题的一部分(始终将流转换为并行的主要影响)......
  • 谢谢@Holger。我怎样才能更好地理解影响?这些操作什么时候完成?只在分裂期间? (如果是,可能问题更小,因为顺序流从不调用.trySplit())。为什么需要进行这些操作?有什么地方可以得到这些信息吗? (顺便说一句:这是我正在寻找的答案)

标签: java java-8 java-stream java-9 spliterator


【解决方案1】:

这不是拆分器的一般属性,而只是封装流管道的包装拆分器。

当您在从拆分器生成且没有链接操作的流上调用 spliterator() 时,您将获得可能支持或不支持 trySplit 的源拆分器,无论流 parallel状态。

ArrayList<String> list = new ArrayList<>();
Collections.addAll(list, "foo", "bar", "baz");
Spliterator<String> sp1 = list.spliterator(), sp2=list.stream().spliterator();
// true
System.out.println(sp1.getClass()==sp2.getClass());
// not null
System.out.println(sp2.trySplit());

同样

Spliterator<String> sp = Stream.of("foo", "bar", "baz").spliterator();
// not null
System.out.println(sp.trySplit());

但是,只要在调用spliterator() 之前链接操作,您就会得到一个包装流管道的拆分器。现在,可以实现执行相关操作的专用拆分器,例如 LimitSpliteratorMappingSpliterator,但这还没有完成,因为将流转换回拆分器已被视为最后的手段,而另一个终端操作不适合,不是高优先级用例。相反,您将始终获得单个实现类的实例,该实例试图将流管道实现的内部工作转换为拆分器 API。

对于有状态的操作,这可能很复杂,最值得注意的是,sorteddistinctskip&limit 用于非SIZED 流。对于琐碎的无状态操作,例如mapfilter,提供支持会容易得多,甚至remarked in a code comment

在第一次操作时绑定到管道助手的拆分器的抽象包装拆分器。 此拆分器不是后期绑定的,并且在首次操作时将绑定到源拆分器。 如果存在有状态操作,则无法拆分从顺序流生成的包装拆分器。

…

   // @@@ Detect if stateful operations are present or not
   //     If not then can split otherwise cannot

   /**
    * True if this spliterator supports splitting
    */
   final boolean isParallel;

但目前似乎还没有实现这种检测,所有中间操作都被视为有状态操作。

Spliterator<String> sp = Stream.of("foo", "bar", "baz").map(x -> x).spliterator();
// null
System.out.println(sp.trySplit());

当您尝试通过始终调用 parallel 来解决此问题时,当流管道仅包含无状态操作时不会产生任何影响。但是当有一个有状态的操作时,它可能会显着改变行为。例如,当您有一个sorted 步骤时,所有元素都必须经过缓冲和排序,然后才能使用第一个元素。对于并行流,它可能会使用parallelSort,即使您从未调用过trySplit

【讨论】:

  • 谢谢@Holger。即使使用.sorted(),如果我不打电话给.parallel(),应该不会有影响吧?示例:StreamSupport.stream(Stream.of("foo", "bar", "baz").map(x -&gt; x).parallel().spliterator(), false).sorted().limit(2).skip(1).findFirst() 如果结果是顺序流,在.spliterator() 之前调用.parallel() 有什么影响?
  • 你误会了。当在spliterator() 之前插入parallel() 时,就像在Arrays.stream(array).map(function).sorted(comparator).parallel().spliterator().tryAdvance() 中一样,有状态操作可能会并行运行,处理整个数组并要求functioncomparator 是线程安全的。这是一个重大影响。
  • 谢谢@Holger。我做了一些单元测试,甚至只使用.tryAdvanced().forEachRemaining() 而不使用.trySplit() 当涉及.distict().sorted() 操作时正在创建线程。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-07-07
  • 2016-02-28
  • 1970-01-01
  • 1970-01-01
  • 2014-07-24
相关资源
最近更新 更多