【问题标题】:Java Spliterator Continually Splits Parallel StreamJava Spliterator 不断拆分并行流
【发布时间】:2021-03-18 22:02:29
【问题描述】:

我发现 Java 并行流有一些令人惊讶的行为。我创建了自己的Spliterator,生成的并行流被分割,直到每个流中只有一个元素。这似乎太小了,我想知道我做错了什么。我希望我可以设置一些特性来纠正这个问题。

这是我的测试代码。这里的Float 只是一个虚拟的payload,我真正的流类要复杂一些。

   public static void main( String[] args ) {
      TestingSpliterator splits = new TestingSpliterator( 10 );
      Stream<Float> test = StreamSupport.stream( splits, true );
      double total = test.mapToDouble( Float::doubleValue ).sum();
      System.out.println( "Total: " + total );
   }

此代码将不断拆分此流,直到每个 Spliterator 都只有一个元素。这似乎太高效了。

输出:

run:
Split on count: 10
Split on count: 5
Split on count: 3
Split on count: 5
Split on count: 2
Split on count: 2
Split on count: 3
Split on count: 2
Split on count: 2
Total: 5.164293184876442
BUILD SUCCESSFUL (total time: 0 seconds)

这是Spliterator 的代码。我主要关心的是我应该使用什么特性,但也许其他地方有问题?

public class TestingSpliterator implements Spliterator<Float> {
   int count;
   int splits;

   public TestingSpliterator( int count ) {
      this.count = count;
   }

   @Override
   public boolean tryAdvance( Consumer<? super Float> cnsmr ) {
      if( count > 0 ) {
         cnsmr.accept( (float)Math.random() );
         count--;
         return true;
      } else
         return false;
   }

   @Override
   public Spliterator<Float> trySplit() {
      System.err.println( "Split on count: " + count );
      if( count > 1 ) {
         splits++;
         int half = count / 2;
         TestingSpliterator newSplit = new TestingSpliterator( count - half );
         count = half;
         return newSplit;
      } else
         return null;
   }

   @Override
   public long estimateSize() {
      return count;
   }

   @Override
   public int characteristics() {
      return IMMUTABLE | SIZED;
   }
}

那么我怎样才能让流被分成更大的块呢?我希望在 10,000 到 50,000 附近会更好。

我知道我可以从trySplit() 方法返回null,但这似乎是一种倒退的方式。系统似乎应该对内核数量、当前负载以及使用流的代码的复杂程度有一些概念,并相应地进行自我调整。换句话说,我希望流块大小由外部配置,而不是由流本身在内部固定。

编辑:重新。 Holger 在下面的回答是,当我增加原始流中的元素数量时,流拆分会有所减少,因此 StreamSupport 最终会停止拆分。

在初始流大小为 100 个元素时,StreamSupport 在流大小达到 2 时停止拆分(我在屏幕上看到的最后一行是 Split on count: 4)。

对于 1000 个元素的初始流大小,各个流块的最终大小约为 32 个元素。


Edit part deux:查看上述输出后,我更改了代码以列出创建的个人Spliterators。变化如下:

   public static void main( String[] args ) {
      TestingSpliterator splits = new TestingSpliterator( 100 );
      Stream<Float> test = StreamSupport.stream( splits, true );
      double total = test.mapToDouble( Float::doubleValue ).sum();
      System.out.println( "Total Spliterators: " + testers.size() );
      for( TestingSpliterator t : testers ) {
         System.out.println( "Splits: " + t.splits );
      }
   } 

还有 TestingSpliterator 的 ctor:

   static Queue<TestingSpliterator> testers = new ConcurrentLinkedQueue<>();

   public TestingSpliterator( int count ) {
      this.count = count;
      testers.add( this ); // OUCH! 'this' escape
   }

这段代码的结果是第一个Spliterator 被拆分了5 次。 nextSpliterator 被拆分 4 次。下一组Spliterators 被拆分 3 次。等等。结果是生成了 36 个Spliterators,并且流被分成了尽可能多的部分。在典型的桌面系统上,这似乎是 API 认为最适合并行操作的方式。

我将在下面接受 Holger 的回答,这本质上是 StreamSupport 班级正在做正确的事,别担心,开心就好。对我来说,部分问题是我正在对非常小的流大小进行早期测试,我对拆分的数量感到惊讶。不要自己犯同样的错误。

【问题讨论】:

    标签: java java-stream


    【解决方案1】:

    你从错误的角度看它。该实现没有拆分“直到每个拆分器都有一个元素”,而是拆分“直到有十个拆分器”。

    单个拆分器实例只能由一个线程处理。拆分器在开始遍历后不需要支持拆分。因此,任何事先未使用的拆分机会都可能导致之后的并行处理能力受限。

    请务必记住,Stream 实现收到了带有未知工作负载的 ToDoubleFunction。它不知道在你的情况下它就像Float::doubleValue 一样简单。它可能是一个需要花一分钟时间评估的函数,然后每个 CPU 核心都有一个分离器是正确的。即使拥有多个 CPU 内核也是一种有效的策略,可以处理某些评估所需时间明显长于其他评估的可能性。

    初始拆分器的典型数量将是“CPU 内核数”× 4,尽管稍后当对实际工作负载有更多了解时,这里可能会进行更多拆分操作。当您的输入数据少于该数字时,将其拆分直到每个拆分器只剩下一个元素就不足为奇了。

    您可以尝试使用new TestingSpliterator( 10000 )1000100 来查看拆分的数量不会有显着变化,一旦实现假设有足够的块来保持所有CPU 内核繁忙。

    由于您的拆分器也不了解消费流的每个元素的工作负载,因此您不必担心这一点。如果您可以顺利地支持拆分为单个元素,那就这样做吧。

    ¹不过,它没有针对没有链接操作的情况进行特殊优化。

    【讨论】:

    • 在 100 个元素时,StreamSupport 将拆分为 2 的流长度(我看到打印的最后一个拆分是 Split on count: 4),对于 1000 个元素的总长度,拆分下降到大约 32元素。我会更新我的问题。
    • 我仍然对它以这种方式工作感到有些惊讶,但在许多情况下拆分这么多似乎是有害的。我仍然试图将我的头脑围绕这样的想法,即这可能是一种默认行为。
    • 正如this answer中所说的“如果你请求并行,你就会得到并行,即使它实际上会降低性能。”如图所示,你甚至可以处理两个元素当工作量足够大时并行。请注意,拆分那么深并不意味着它在不同的线程上运行;如果实际工作负载较低,则本地处理线程可能会在另一个线程拾取它之前获取下一个块。然后,您只需要创建另一个轻量级对象的少量开销。 “并行会有好处”没有神奇的门槛
    • 好的,我确信StreamSupport 类没有损坏,它的行为是合理的。感谢您为我指明正确的方向。
    【解决方案2】:

    除非我遗漏了明显的内容,否则您始终可以在构造函数中传递 bufferSize 并将其用于您的 trySplit

    @Override
    public Spliterator<Float> trySplit() {
    
         if( count > 1 ) {
            splits++;
            if(count > bufferSize) {
                count = count - bufferSize;
                return new TestingSpliterator( bufferSize, bufferSize);
            }
    
        }
        return null;
    }
    

    还有这个:

    TestingSpliterator splits = new TestingSpliterator(12, 5);
    Stream<Float> test = StreamSupport.stream(splits, true);
    
    test.map(x -> new AbstractMap.SimpleEntry<>(
                       x.doubleValue(), 
                       Thread.currentThread().getName()))
        .collect(Collectors.groupingBy(
                    Map.Entry::getValue, 
                    Collectors.mapping(
                         Map.Entry::getKey, 
                         Collectors.toList())))
        .forEach((x, y) -> System.out.println("Thread : " + x + " processed : " + y));
    

    您会看到有 3 个线程。其中两个处理5 元素和一个2

    【讨论】:

    • 好吧,我想这里很明显是我希望 API 提供的东西可以为我做这件事。我有点惊讶我必须自己管理这个,API 没有提供合理的默认拆分。我将对此进行更多讨论,但非常感谢您的意见。
    • 当然,如果拆分器在不知道实际流操作的情况下决定 bufferSize 是有效的,那将是倒退。有人决定将十个元素的流变为并行(将true 传递给StreamSupport.stream( splits, true )),当知道应该做什么工作的人有效地关闭并行处理不是 Stream API 实现(也不是拆分器)的任务to be done 决定做并行。
    猜你喜欢
    • 2019-11-17
    • 2018-10-29
    • 2018-03-14
    • 2018-03-24
    • 1970-01-01
    • 1970-01-01
    • 2018-01-14
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多