【发布时间】:2014-10-16 19:22:15
【问题描述】:
在这种情况下(任何版本)我试图找出 Java 中的最佳方法来实现单个生产者多个消费者,其中我使用 ExecutorService(优先选择,不需要)生产者需要“永远”运行但每次运行时,它都需要等待所有内容都被完全处理,例如所有消费者线程都已终止,队列为空,并且没有任何项目要生产。生产者也应该只以固定的时间间隔轮询其数据源。
举个例子:我希望我的生产者每 30 分钟轮询一次它的数据源以获取记录并将它们提交到队列中。如果消费者的处理时间超过 30 分钟,我希望生产者等到所有项目都处理完毕后再再次轮询其数据源(因为它会在 30 分钟过去后立即这样做)。
不找人为我编写代码。一些基本的提示/指导将不胜感激。
这是我尝试使用的简化示例实现。我已经采取了所有可怕的尝试来解决这个问题。请注意,用于构造 ThreadPoolExecutor 的硬编码参数最终将是动态的。
import java.util.concurrent.*;
public class ItemProcessorService {
public static void main(String args[]) throws InterruptedException {
RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
ThreadFactory threadFactory = Executors.defaultThreadFactory();
int corePoolSize = 5,
maxPoolSize = 10,
keepAlive = 10,
queueCapacity = 1;
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAlive, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueCapacity), threadFactory, rejectionHandler);
for (int i = 0; i < 10; i++) {
executor.execute(new ItemConsumer());
}
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Executor finished");
}
}
class ItemConsumer implements Runnable {
@Override
public void run() {
processItem();
}
private void processItem() {
try {
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + " - processed item");
} catch (InterruptedException e) {
//e.printStackTrace();
}
}
}
class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + " - rejected");
}
}
【问题讨论】:
-
您应该使用
executor.awaitTermination(...),而不是在循环中忙着等待。 -
谢谢。这让我写起来很不舒服。我在一个示例中找到了它,并没有完全探索 executors 功能。我认为有一个体面的等待。
标签: java multithreading concurrency consumer producer