【问题标题】:How to interconnect non-parallel stream with parallel stream(one producer multiple consumers)如何将非并行流与并行流互连(一个生产者多个消费者)
【发布时间】:2016-08-15 23:06:29
【问题描述】:

我正在尝试使用 Java8 中的流创建一个生产者多消费者模型。我正在从数据库资源读取和处理数据,并且 我想以流方式处理它们(无法将整个资源读入内存)。

源的读取必须是单线程的(游标不是线程安全的)并且读取速度快,每个数据块的处理是繁重的操作可以并行运行。

我还没有发现如何通过并行流处理加入(互连)非并行流。有什么办法可以用 Java8 流 API 来做吗?

代码示例:

这个迭代器必须在单线程中运行,因为游标不是线程安全的。

class SimpleIterator<Data> implements Iterator<Data>{

    private volatile Cursor cursor;

    public SimpleIterator(Cursor cursor){
        this.cursor = cursor;
    }

   @Override
    public boolean hasNext() {
        return cursor.hasNext();
    }    

    @Override
    public Data next() {
     return cursor.next();

    }
}

//创建非并行流

SimpleIterator<Data> iterator = new SimpleIterator<>(queryCursor);
Iterable<Data> iterable = () -> iterator;
Stream<Data> resultStream = StreamSupport.stream(iterable.spliterator(), false); // prallel set as false

//每个数据的处理数据应该并行运行

resultStream.parallel().forEach(data->processData(data)); 
public processData(Data data){
//heavy operation
}

但是,如果我在调用 forEach 之前将流设置为并行,则整个流是并行的,并且迭代器也在多个线程中调用。 有什么方法可以在 Java8 中互连这两个流,或者我必须创建一些队列,将数据从单线程生产者流提供到并行流。

【问题讨论】:

  • 我相信您在这里的假设是迭代器将被多个线程并行调用。这并不完全正确。它将被多个线程调用,是的 - 但不是并行的。您的代码所做的是创建一个大小未知的 IteratorSpliterator 来包装您的迭代器。 Spliterator 和 Iterator 都不需要是线程安全的(Fork/Join 负责这一点)。可以这样想:ArrayList 是线程安全的吗?没有。你能对它做并行流处理吗?是的,你可以。
  • 你说得对,我的假设和你描述的完全一样。感谢您向我澄清这一点。我仍然认为,如果多个线程调用我的迭代器,即使它们没有并行运行,也会出现一些线程问题。例如,如果光标不会被设置为 volatile,那么其他线程可以看到一些缓存对象而不是原始对象(例如)。
  • 然后确保您的光标安全发布。 IteratorSpliterator 将在 Iterator 上以不断增长的批次运行(每个批次都是 1024 的倍数,例如:3072、7168、11264...),并将 Iterator 中的元素复制到以当前批次大小为长度的数组中。从该数组创建一个 ArraySpliterator(可以进一步拆分)。只有在复制了一批时,您才会在迭代器中看到线程切换。
  • 感谢您的澄清。你帮了我很多。

标签: java multithreading java-stream


【解决方案1】:

我正在解决一个需要对两个流进行完全外部联接的问题。问题似乎相似。我所做的是插入两个阻塞队列来缓冲我的输入。我认为您可以对一个阻塞队列执行类似的操作,将单个流拆分为多个流,而无需并行化源流。

我提出的解决方案可以在下面找到。我还没有测试加入两个流的解决方案,所以我不确定这是否有效。 AbstractSpliterator 类有一个 trySplit 的实现; trySplit 上的 cmets 提供了丰富的信息。该类的最后一个方法从拆分器实现构造一个可并行化的流。

import java.util.Spliterators;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Stream;

public class StreamSplitter<T> extends Spliterators.AbstractSpliterator<T> {
    final T EOS = null; // Just a stub -- can't put a null in BlockingQueue

    private final BlockingQueue<T> queue;
    private final Thread thread;

    // An implementation of Runnable that fills a queue from a stream
    private class Filler implements Runnable {
        private final Stream<T> stream;
        private final BlockingQueue<T> queue;

        private Filler(Stream<T> stream, BlockingQueue<T> queue) {
            this.stream = stream;
            this.queue = queue;
        }

        @Override
        public void run() {
            stream.forEach(x -> {
                try {
                    // Blocks if the queue is full
                    queue.put(x);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            // Stream is drained put end of stream marker.
            try {
                queue.put(EOS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private StreamSplitter(long estSize, int characteristics, Stream<T> srcStream) {
        super(estSize, characteristics);
        queue = new ArrayBlockingQueue<T>(1024);
        // Fill the queue from a separate thread (may want to externalize this).
        thread = new Thread(new Filler(srcStream, queue));
        thread.start();
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        try {
            T value = queue.take(); // waits (blocks) for entries in queue

            // If end of stream marker is found, return false signifying
            // that the stream is finished.
            if (value == EOS) {
                return false;
            }
            // Accept the next value.
            action.accept(value);
        } catch (InterruptedException e) {
            return false;
        }
        return true;
    }

    public static <T> Stream<T> splitStream(long estSize, int characteristics, Stream<T> srcStream) {
        Spliterator<T> spliterator = new StreamSplitter<T>(estSize, characteristics, srcStream);
        return StreamSupport.stream(spliterator, true);
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-03-06
    • 1970-01-01
    • 1970-01-01
    • 2015-09-25
    • 2012-04-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多