【发布时间】:2016-08-15 23:06:29
【问题描述】:
我正在尝试使用 Java8 中的流创建一个生产者多消费者模型。我正在从数据库资源读取和处理数据,并且 我想以流方式处理它们(无法将整个资源读入内存)。
源的读取必须是单线程的(游标不是线程安全的)并且读取速度快,每个数据块的处理是繁重的操作可以并行运行。
我还没有发现如何通过并行流处理加入(互连)非并行流。有什么办法可以用 Java8 流 API 来做吗?
代码示例:
这个迭代器必须在单线程中运行,因为游标不是线程安全的。
class SimpleIterator<Data> implements Iterator<Data>{
private volatile Cursor cursor;
public SimpleIterator(Cursor cursor){
this.cursor = cursor;
}
@Override
public boolean hasNext() {
return cursor.hasNext();
}
@Override
public Data next() {
return cursor.next();
}
}
//创建非并行流
SimpleIterator<Data> iterator = new SimpleIterator<>(queryCursor);
Iterable<Data> iterable = () -> iterator;
Stream<Data> resultStream = StreamSupport.stream(iterable.spliterator(), false); // prallel set as false
//每个数据的处理数据应该并行运行
resultStream.parallel().forEach(data->processData(data));
public processData(Data data){
//heavy operation
}
但是,如果我在调用 forEach 之前将流设置为并行,则整个流是并行的,并且迭代器也在多个线程中调用。 有什么方法可以在 Java8 中互连这两个流,或者我必须创建一些队列,将数据从单线程生产者流提供到并行流。
【问题讨论】:
-
我相信您在这里的假设是迭代器将被多个线程并行调用。这并不完全正确。它将被多个线程调用,是的 - 但不是并行的。您的代码所做的是创建一个大小未知的 IteratorSpliterator 来包装您的迭代器。 Spliterator 和 Iterator 都不需要是线程安全的(Fork/Join 负责这一点)。可以这样想:ArrayList 是线程安全的吗?没有。你能对它做并行流处理吗?是的,你可以。
-
你说得对,我的假设和你描述的完全一样。感谢您向我澄清这一点。我仍然认为,如果多个线程调用我的迭代器,即使它们没有并行运行,也会出现一些线程问题。例如,如果光标不会被设置为 volatile,那么其他线程可以看到一些缓存对象而不是原始对象(例如)。
-
然后确保您的光标安全发布。 IteratorSpliterator 将在 Iterator 上以不断增长的批次运行(每个批次都是 1024 的倍数,例如:3072、7168、11264...),并将 Iterator 中的元素复制到以当前批次大小为长度的数组中。从该数组创建一个 ArraySpliterator(可以进一步拆分)。只有在复制了一批时,您才会在迭代器中看到线程切换。
-
感谢您的澄清。你帮了我很多。
标签: java multithreading java-stream