【问题标题】:BlockingQueue: how can multiple producers stop a single consumer?BlockingQueue:多个生产者如何阻止单个消费者?
【发布时间】:2013-11-16 15:39:48
【问题描述】:

我使用 Java 的 BlockingQueue 编写了一个基于生产者/消费者的程序。如果所有生产者都完成了,我正试图找到一种方法来阻止消费者。有多个生产者,但只有一个消费者。

我为“一个生产者,多个消费者”场景找到了几种解决方案,例如使用“完成的 paket / 毒丸”(参见 this discussion),但我的情况正好相反。

有没有最佳实践解决方案?

【问题讨论】:

  • 消费者通常会做什么来生成池任务?

标签: java multithreading producer-consumer


【解决方案1】:

最佳实践系统是使用count-down latch。这是否适合你更有趣......

也许每个生产者都应该向消费者注册和注销,当所有生产者都被注销(并且队列为空)时,消费者也可以终止。

【讨论】:

    【解决方案2】:

    大概您的生产者正在同一个 VM 中的不同线程中工作,并且在完成后退出。我会在一个循环中创建另一个对所有生产者调用 join() 的线程,当它存在该循环时(因为所有生产者线程都已结束),它会通知消费者它是时候退出了。这必须在另一个线程中运行,因为 join() 调用会阻塞。顺便说一句,如果我理解正确的话,rolfl 关于使用倒计时锁存器的建议会有问题。

    或者,如果生产者是 Callables,那么消费者可以在循环中调用他们的 Futures 上的 isDone() 和 isCanceled(),这不会阻塞,因此可以直接在消费者线程中使用。

    【讨论】:

      【解决方案3】:

      您可以使用以下内容,我使用registerProducer()unregisterProducer() 来跟踪生产者。另一种可能的解决方案是使用WeakReferences。

      值得一提的是,这个方案在consumer关闭时不会消费已经排队的事件,所以关闭时可能会丢失一些事件。

      如果消费者被中断然后处理它们,您将不得不排空队列。

      import java.util.ArrayList;
      import java.util.List;
      import java.util.concurrent.ArrayBlockingQueue;
      import java.util.concurrent.BlockingQueue;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      import java.util.concurrent.TimeUnit;
      import java.util.concurrent.atomic.AtomicBoolean;
      import java.util.concurrent.atomic.AtomicInteger;
      
      public class TestConsumerShutdown {
      
        private static interface SomeEvent {
          String getName();
        }
      
        private static class Consumer implements Runnable {
      
          private final BlockingQueue<SomeEvent> queue = new ArrayBlockingQueue<>(10);
          private final ExecutorService consumerExecutor = Executors.newSingleThreadExecutor();
          private final AtomicBoolean isRunning = new AtomicBoolean();
          private final AtomicInteger numberProducers = new AtomicInteger(0);
      
      
          public void startConsumer() {
            consumerExecutor.execute(this);
          }
      
          public void stopConsumer() {
            consumerExecutor.shutdownNow();
            try {
              consumerExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
              Thread.currentThread().interrupt();
            }
          }
      
          public void registerProducer() {
            numberProducers.incrementAndGet();
          }
      
          public void unregisterProducer() {
            if (numberProducers.decrementAndGet() < 1) {
              stopConsumer();
            }
          }
      
          public void produceEvent(SomeEvent event) throws InterruptedException {
            queue.put(event);
          }
      
          @Override
          public void run() {
            if (isRunning.compareAndSet(false, true)) {
              try {
                while (!Thread.currentThread().isInterrupted()) {
                  SomeEvent event = queue.take();
                  System.out.println(event.getName());
                }
              } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
              } finally {
                System.out.println("Consumer stopped.");
                isRunning.set(false);
              }
            }
          }
        }
      
        public static void main(String[] args) {
          final Consumer consumer = new Consumer();
          consumer.startConsumer();
      
          final Runnable producerRunnable = new Runnable() {
      
            @Override
            public void run() {
              final String name = Thread.currentThread().getName();   
              consumer.registerProducer();
              try {
                for (int i = 0; i < 10; i++) {
                  consumer.produceEvent(new SomeEvent() {
      
      
      
                    @Override
                    public String getName() {
                      return name;
                    }
                  });
                }
                System.out.println("Produver " + name + " stopped.");
              } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
              } finally {
                consumer.unregisterProducer();
              }
      
            }
          };
          List<Thread> producers = new ArrayList<>();
          producers.add(new Thread(producerRunnable, "producer-1"));
          producers.add(new Thread(producerRunnable, "producer-2"));
          producers.add(new Thread(producerRunnable, "producer-3"));
      
          for (Thread t : producers) {
            t.start();
          }
      
        }
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-03-07
        • 2016-09-11
        • 2020-06-17
        • 2011-11-08
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多