【发布时间】:2023-04-01 13:34:01
【问题描述】:
我想知道使用BlockingQueue 而不是(PipedOutputStream 和PipedInputStream)的优势
import java.io.*;
import java.util.concurrent.*;
public class PipedStreamVsBlocking {
public static void main(String... args) {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(2);
ExecutorService executor = Executors.newFixedThreadPool(4);
Runnable producerTask = () -> {
try {
while (true) {
int value = ThreadLocalRandom.current().nextInt(0, 1000);
blockingQueue.put(value);
System.out.println("BlockingQueue.Produced " + value);
int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
Thread.sleep(timeSleeping);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Runnable consumerTask = () -> {
try {
while (true) {
int value = blockingQueue.take();
System.out.println("BlockingQueue.Consume " + value);
int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
Thread.sleep(timeSleeping);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
PipedOutputStream pipedSrc = new PipedOutputStream();
PipedInputStream pipedSnk = new PipedInputStream();
try {
pipedSnk.connect(pipedSrc);
} catch (IOException e) {
e.printStackTrace();
}
Runnable runnablePut2 = () -> {
try {
ObjectOutputStream oos = new ObjectOutputStream(pipedSrc);
while (true) {
int value = ThreadLocalRandom.current().nextInt(0, 1000);
oos.writeInt(value);
oos.flush();
System.out.println("PipedStream.Produced " + value);
int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
Thread.sleep(timeSleeping);
}
} catch (Exception e) {
e.printStackTrace();
}
};
Runnable runnableGet2 = () -> {
try {
ObjectInputStream ois = new ObjectInputStream(pipedSnk);
while (true) {
int value = ois.readInt();
System.out.println("PipedStream.Consume " + value);
int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
Thread.sleep(timeSleeping);
}
} catch (Exception e) {
e.printStackTrace();
}
};
executor.execute(producerTask);
executor.execute(consumerTask);
executor.execute(runnablePut2);
executor.execute(runnableGet2);
executor.shutdown();
}
}
这段代码的输出是:
BlockingQueue.Consume 298
BlockingQueue.Produced 298
PipedStream.Produced 510
PipedStream.Consume 510
BlockingQueue.Produced 536
BlockingQueue.Consume 536
PipedStream.Produced 751
PipedStream.Consume 751
PipedStream.Produced 619
BlockingQueue.Produced 584
BlockingQueue.Consume 584
PipedStream.Consume 619
BlockingQueue.Produced 327
PipedStream.Produced 72
BlockingQueue.Consume 327
PipedStream.Consume 72
BlockingQueue.Produced 823
BlockingQueue.Consume 823
PipedStream.Produced 544
PipedStream.Consume 544
BlockingQueue.Produced 352
BlockingQueue.Consume 352
PipedStream.Produced 134
PipedStream.Consume 134
我认为使用 PipedStream(PipedOutputStream 和 PipedInputStream)有优势,我知道何时直接生成/处理数据。
可能是我错了,这个 recommendation 使用 BlockingQueue 而不是 Pipe。
但是,文档中找不到您的 cmets/建议。 因此,我需要知道我错过了什么。
为什么要使用 BlockingQueue 而不是 Piped?
【问题讨论】:
-
“我知道什么时候直接生成/处理数据”是什么意思?
-
在生产者/消费者看来stackoverflow.com/a/10416666/811293
-
因为
write被read“立即”检测到了,虽然认为BlockingQueue也很好。 -
写入被另一个线程检测到,如果它恰好已经在读取操作中,无论你使用哪种构造。
标签: java-8 executorservice blockingqueue