【发布时间】:2021-03-18 22:02:29
【问题描述】:
我发现 Java 并行流有一些令人惊讶的行为。我创建了自己的Spliterator,生成的并行流被分割,直到每个流中只有一个元素。这似乎太小了,我想知道我做错了什么。我希望我可以设置一些特性来纠正这个问题。
这是我的测试代码。这里的Float 只是一个虚拟的payload,我真正的流类要复杂一些。
public static void main( String[] args ) {
TestingSpliterator splits = new TestingSpliterator( 10 );
Stream<Float> test = StreamSupport.stream( splits, true );
double total = test.mapToDouble( Float::doubleValue ).sum();
System.out.println( "Total: " + total );
}
此代码将不断拆分此流,直到每个 Spliterator 都只有一个元素。这似乎太高效了。
输出:
run:
Split on count: 10
Split on count: 5
Split on count: 3
Split on count: 5
Split on count: 2
Split on count: 2
Split on count: 3
Split on count: 2
Split on count: 2
Total: 5.164293184876442
BUILD SUCCESSFUL (total time: 0 seconds)
这是Spliterator 的代码。我主要关心的是我应该使用什么特性,但也许其他地方有问题?
public class TestingSpliterator implements Spliterator<Float> {
int count;
int splits;
public TestingSpliterator( int count ) {
this.count = count;
}
@Override
public boolean tryAdvance( Consumer<? super Float> cnsmr ) {
if( count > 0 ) {
cnsmr.accept( (float)Math.random() );
count--;
return true;
} else
return false;
}
@Override
public Spliterator<Float> trySplit() {
System.err.println( "Split on count: " + count );
if( count > 1 ) {
splits++;
int half = count / 2;
TestingSpliterator newSplit = new TestingSpliterator( count - half );
count = half;
return newSplit;
} else
return null;
}
@Override
public long estimateSize() {
return count;
}
@Override
public int characteristics() {
return IMMUTABLE | SIZED;
}
}
那么我怎样才能让流被分成更大的块呢?我希望在 10,000 到 50,000 附近会更好。
我知道我可以从trySplit() 方法返回null,但这似乎是一种倒退的方式。系统似乎应该对内核数量、当前负载以及使用流的代码的复杂程度有一些概念,并相应地进行自我调整。换句话说,我希望流块大小由外部配置,而不是由流本身在内部固定。
编辑:重新。 Holger 在下面的回答是,当我增加原始流中的元素数量时,流拆分会有所减少,因此 StreamSupport 最终会停止拆分。
在初始流大小为 100 个元素时,StreamSupport 在流大小达到 2 时停止拆分(我在屏幕上看到的最后一行是 Split on count: 4)。
对于 1000 个元素的初始流大小,各个流块的最终大小约为 32 个元素。
Edit part deux:查看上述输出后,我更改了代码以列出创建的个人Spliterators。变化如下:
public static void main( String[] args ) {
TestingSpliterator splits = new TestingSpliterator( 100 );
Stream<Float> test = StreamSupport.stream( splits, true );
double total = test.mapToDouble( Float::doubleValue ).sum();
System.out.println( "Total Spliterators: " + testers.size() );
for( TestingSpliterator t : testers ) {
System.out.println( "Splits: " + t.splits );
}
}
还有 TestingSpliterator 的 ctor:
static Queue<TestingSpliterator> testers = new ConcurrentLinkedQueue<>();
public TestingSpliterator( int count ) {
this.count = count;
testers.add( this ); // OUCH! 'this' escape
}
这段代码的结果是第一个Spliterator 被拆分了5 次。 nextSpliterator 被拆分 4 次。下一组Spliterators 被拆分 3 次。等等。结果是生成了 36 个Spliterators,并且流被分成了尽可能多的部分。在典型的桌面系统上,这似乎是 API 认为最适合并行操作的方式。
我将在下面接受 Holger 的回答,这本质上是 StreamSupport 班级正在做正确的事,别担心,开心就好。对我来说,部分问题是我正在对非常小的流大小进行早期测试,我对拆分的数量感到惊讶。不要自己犯同样的错误。
【问题讨论】:
标签: java java-stream