【发布时间】:2012-10-08 08:51:30
【问题描述】:
Java 是否支持任何队列对象或机制来处理批处理?
ex:我们有一个队列(或任何想要的队列对象),一些生产者将项目一一推送到队列中,我的目标是当我们在这个队列中有 10 个项目或超过 10 个项目时,我们可以触发一些处理程序一批处理。
或者它不是自动触发的,我们需要找到一种方法在处理程序端优雅地循环队列。
我们是否有典型的高性能对象或库来处理这个问题?
谢谢, 埃姆雷
【问题讨论】:
Java 是否支持任何队列对象或机制来处理批处理?
ex:我们有一个队列(或任何想要的队列对象),一些生产者将项目一一推送到队列中,我的目标是当我们在这个队列中有 10 个项目或超过 10 个项目时,我们可以触发一些处理程序一批处理。
或者它不是自动触发的,我们需要找到一种方法在处理程序端优雅地循环队列。
我们是否有典型的高性能对象或库来处理这个问题?
谢谢, 埃姆雷
【问题讨论】:
队列中的批处理可以通过等待/通知来实现,就像你会阻止对资源的线程调用,直到它可用或不可用。
public class MyQueue implements Queue<Object>{
public synchronized List<Object> peek() {
if(this.list.size()>=10)
this.list.wait();
return Collections.subList(0,10);
}
@Override
public boolean add(Object e) {
this.list.add(e);
if(this.list.size()>=10)
this.list.notifyAll();
return false;
}
}
不会自动触发
在这种情况下,您可以在指定超时后调用等待。
【讨论】:
您可以使用BlockingQueue.drainTo()自动获取批量要执行的任务。这适用于每秒超过 100K 的任务。
如果您需要更高性能的队列,您可以使用更复杂的 Disruptor 或 Java Chronicle,它们每秒可以排队处理数百万个任务,都支持自动批处理。
【讨论】:
以下是批量处理对象的快速尝试,使用后台线程收集和处理由其他线程推送到队列中的对象:
public abstract class Batcher<E> implements Runnable {
public static interface BatchProcessor<E> {
public void processBatch(List<E> batch);
}
private final BlockingQueue<E> queue;
private final BatchProcessor<E> processor;
private Batcher(BlockingQueue<E> queue, BatchProcessor<E> processor) {
this.queue = queue;
this.processor = processor;
}
@Override
public void run() {
try {
while (true) {
List<E> batch = new ArrayList<E>();
for (int i = 0; i < 10; i++) {
batch.add(queue.take());
}
processor.processBatch(batch);
}
} catch (InterruptedException e) {
return;
}
}
}
要使用它,您创建一个BlockingQueue 并在其上放置对象,创建一个BatchProcessor 实现的实例来处理批次,然后创建一个Batcher 的实例以将对象从前者泵送到后者。
【讨论】:
查看接口java.util.Queue的API文档,它有几个实现。
还有一个标准 API,Java Message Service (JMS) 来处理队列系统,以便在不同进程之间交换消息。
【讨论】:
我认为CountDownLatch 是您需要的,或者可能是CyclicBarrier。这将允许您设置一个同步点,该点将在发生一定数量的操作后触发消费者,并且您可以使用标准队列作为容器对象。
【讨论】:
CountDownLatch 在有多个线程产生或多个线程消耗(或两者兼有)时特别有用。目前尚不清楚这里的情况。