【问题标题】:BlockingQueue vs PipedOutputStream and PipedInputStreamBlockingQueue 与 PipedOutputStream 和 PipedInputStream
【发布时间】:2023-04-01 13:34:01
【问题描述】:

我想知道使用BlockingQueue 而不是(PipedOutputStreamPipedInputStream)的优势

import java.io.*;
import java.util.concurrent.*;


public class PipedStreamVsBlocking {

  public static void main(String... args) {

    BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(2);
    ExecutorService executor = Executors.newFixedThreadPool(4);
    Runnable producerTask = () -> {
      try {
        while (true) {
          int value = ThreadLocalRandom.current().nextInt(0, 1000);
          blockingQueue.put(value);
          System.out.println("BlockingQueue.Produced " + value);
          int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
          Thread.sleep(timeSleeping);
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    };
    Runnable consumerTask = () -> {
      try {
        while (true) {
          int value = blockingQueue.take();
          System.out.println("BlockingQueue.Consume " + value);
          int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
          Thread.sleep(timeSleeping);
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    };

    PipedOutputStream pipedSrc = new PipedOutputStream();
    PipedInputStream pipedSnk = new PipedInputStream();
    try {
      pipedSnk.connect(pipedSrc);
    } catch (IOException e) {
      e.printStackTrace();
    }

    Runnable runnablePut2 = () -> {
      try {
        ObjectOutputStream oos = new ObjectOutputStream(pipedSrc);
        while (true) {
          int value = ThreadLocalRandom.current().nextInt(0, 1000);
          oos.writeInt(value);
          oos.flush();
          System.out.println("PipedStream.Produced " + value);
          int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
          Thread.sleep(timeSleeping);
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    };

    Runnable runnableGet2 = () -> {
      try {
        ObjectInputStream ois = new ObjectInputStream(pipedSnk);
        while (true) {
          int value = ois.readInt();
          System.out.println("PipedStream.Consume " + value);
          int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
          Thread.sleep(timeSleeping);
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    };
    executor.execute(producerTask);
    executor.execute(consumerTask);
    executor.execute(runnablePut2);
    executor.execute(runnableGet2);
    executor.shutdown();
  }

}

这段代码的输出是:

BlockingQueue.Consume 298
BlockingQueue.Produced 298
PipedStream.Produced 510
PipedStream.Consume 510
BlockingQueue.Produced 536
BlockingQueue.Consume 536
PipedStream.Produced 751
PipedStream.Consume 751
PipedStream.Produced 619
BlockingQueue.Produced 584
BlockingQueue.Consume 584
PipedStream.Consume 619
BlockingQueue.Produced 327
PipedStream.Produced 72
BlockingQueue.Consume 327
PipedStream.Consume 72
BlockingQueue.Produced 823
BlockingQueue.Consume 823
PipedStream.Produced 544
PipedStream.Consume 544
BlockingQueue.Produced 352
BlockingQueue.Consume 352
PipedStream.Produced 134
PipedStream.Consume 134

我认为使用 PipedStream(PipedOutputStreamPipedInputStream)有优势,我知道何时直接生成/处理数据。

可能是我错了,这个 recommendation 使用 BlockingQueue 而不是 Pipe。

但是,文档中找不到您的 cmets/建议。 因此,我需要知道我错过了什么。

为什么要使用 BlockingQueue 而不是 Piped?

【问题讨论】:

  • “我知道什么时候直接生成/处理数据”是什么意思?
  • 在生产者/消费者看来stackoverflow.com/a/10416666/811293
  • 因为writeread“立即”检测到了,虽然认为BlockingQueue也很好。
  • 写入被另一个线程检测到,如果它恰好已经在读取操作中,无论你使用哪种构造。

标签: java-8 executorservice blockingqueue


【解决方案1】:

与任何 Java Collection 一样,BlockingQueue 存储对象的引用,因此从中检索对象的线程接收完全相同的运行时对象,生产线程放置进入它。

相比之下,序列化将持久形式存储到字节流中,它仅适用于Serializable 对象,并会导致在接收端创建副本。在某些情况下,对象可能会在之后被规范对象替换,但整个过程比仅传输引用要昂贵得多。

在您的示例中,当您传输 int 值时,对象身份无关紧要,但装箱、序列化、反序列化和取消装箱 Integer 实例的开销更值得怀疑。

如果您没有使用序列化,而是直接将int 值作为四个byte 数量进行传输,那么使用PipedOutputStreamPipedInputStream 是有道理的,因为它是传输大量原始数据的好工具.它还具有通过关闭管道来标记数据结束的内在支持。

这些管道也是软件的正确工具,这些软件应该与进程或什至运行生产者或消费者的计算机无关,即当管道实际上位于进程之间时,您希望能够使用相同的软件或甚至是网络连接。这也证明使用序列化是合理的(就像 JMX 连接一样)。

但除非您真正传输在被撕裂时仍保留其含义的单个字节,否则存在固有限制,即只有一个生产者可以写入管道,而只有一个消费者可以读取数据。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-03-18
    • 2017-10-28
    • 1970-01-01
    • 2018-10-06
    • 1970-01-01
    • 2012-05-11
    相关资源
    最近更新 更多