【问题标题】:BlockingQueue: how can multiple producers stop a single consumer?BlockingQueue:多个生产者如何阻止单个消费者?
【发布时间】:2013-11-16 15:39:48
【问题描述】:
我使用 Java 的 BlockingQueue 编写了一个基于生产者/消费者的程序。如果所有生产者都完成了,我正试图找到一种方法来阻止消费者。有多个生产者,但只有一个消费者。
我为“一个生产者,多个消费者”场景找到了几种解决方案,例如使用“完成的 paket / 毒丸”(参见 this discussion),但我的情况正好相反。
有没有最佳实践解决方案?
【问题讨论】:
标签:
java
multithreading
producer-consumer
【解决方案1】:
最佳实践系统是使用count-down latch。这是否适合你更有趣......
也许每个生产者都应该向消费者注册和注销,当所有生产者都被注销(并且队列为空)时,消费者也可以终止。
【解决方案2】:
大概您的生产者正在同一个 VM 中的不同线程中工作,并且在完成后退出。我会在一个循环中创建另一个对所有生产者调用 join() 的线程,当它存在该循环时(因为所有生产者线程都已结束),它会通知消费者它是时候退出了。这必须在另一个线程中运行,因为 join() 调用会阻塞。顺便说一句,如果我理解正确的话,rolfl 关于使用倒计时锁存器的建议会有问题。
或者,如果生产者是 Callables,那么消费者可以在循环中调用他们的 Futures 上的 isDone() 和 isCanceled(),这不会阻塞,因此可以直接在消费者线程中使用。
【解决方案3】:
您可以使用以下内容,我使用registerProducer() 和unregisterProducer() 来跟踪生产者。另一种可能的解决方案是使用WeakReferences。
值得一提的是,这个方案在consumer关闭时不会消费已经排队的事件,所以关闭时可能会丢失一些事件。
如果消费者被中断然后处理它们,您将不得不排空队列。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class TestConsumerShutdown {
private static interface SomeEvent {
String getName();
}
private static class Consumer implements Runnable {
private final BlockingQueue<SomeEvent> queue = new ArrayBlockingQueue<>(10);
private final ExecutorService consumerExecutor = Executors.newSingleThreadExecutor();
private final AtomicBoolean isRunning = new AtomicBoolean();
private final AtomicInteger numberProducers = new AtomicInteger(0);
public void startConsumer() {
consumerExecutor.execute(this);
}
public void stopConsumer() {
consumerExecutor.shutdownNow();
try {
consumerExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void registerProducer() {
numberProducers.incrementAndGet();
}
public void unregisterProducer() {
if (numberProducers.decrementAndGet() < 1) {
stopConsumer();
}
}
public void produceEvent(SomeEvent event) throws InterruptedException {
queue.put(event);
}
@Override
public void run() {
if (isRunning.compareAndSet(false, true)) {
try {
while (!Thread.currentThread().isInterrupted()) {
SomeEvent event = queue.take();
System.out.println(event.getName());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
System.out.println("Consumer stopped.");
isRunning.set(false);
}
}
}
}
public static void main(String[] args) {
final Consumer consumer = new Consumer();
consumer.startConsumer();
final Runnable producerRunnable = new Runnable() {
@Override
public void run() {
final String name = Thread.currentThread().getName();
consumer.registerProducer();
try {
for (int i = 0; i < 10; i++) {
consumer.produceEvent(new SomeEvent() {
@Override
public String getName() {
return name;
}
});
}
System.out.println("Produver " + name + " stopped.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
consumer.unregisterProducer();
}
}
};
List<Thread> producers = new ArrayList<>();
producers.add(new Thread(producerRunnable, "producer-1"));
producers.add(new Thread(producerRunnable, "producer-2"));
producers.add(new Thread(producerRunnable, "producer-3"));
for (Thread t : producers) {
t.start();
}
}
}