【发布时间】:2012-07-23 03:25:14
【问题描述】:
在使用中断器时,可能会有一个消费者落后,并且由于消费者速度慢,整个应用程序都会受到影响。
请记住,每个生产者(发布者)和消费者(事件处理器)都在一个线程上运行,有什么办法可以解决消费者缓慢的问题?
我们可以在单个消费者上使用多个线程吗?如果没有,有什么更好的选择?
【问题讨论】:
标签: consumer disruptor-pattern
在使用中断器时,可能会有一个消费者落后,并且由于消费者速度慢,整个应用程序都会受到影响。
请记住,每个生产者(发布者)和消费者(事件处理器)都在一个线程上运行,有什么办法可以解决消费者缓慢的问题?
我们可以在单个消费者上使用多个线程吗?如果没有,有什么更好的选择?
【问题讨论】:
标签: consumer disruptor-pattern
一般来说,使用 WorkerPool 允许多个池化工作线程在单个使用者上工作,如果您有独立且可能持续时间可变的任务(例如:一些短任务,一些更长的任务),这是很好的。
另一种选择是让多个独立的工作人员并行处理事件,但每个工作人员只处理模数 N 个工作人员(例如,2 个线程,一个线程处理奇数,一个线程处理偶数事件 ID)。如果您有一致的持续时间处理任务,这将非常有用,并且允许批处理也非常有效地工作。
要考虑的另一件事是消费者可以进行“批处理”,这在例如审计中特别有用。如果您的消费者有 10 个事件等待,而不是单独将 10 个事件写入审计日志,您可以收集所有 10 个事件并同时写入它们。根据我的经验,这不仅仅涵盖了运行多个线程的需要。
【讨论】:
尝试将慢速部分分离到其他线程(I/O,而不是 O(1) 或 O(log) 计算等),或者在消费者超载时施加某种背压(通过让步或临时停车)生产者,以 503 或 429 状态码重播等): http://mechanical-sympathy.blogspot.com/2012/05/apply-back-pressure-when-overloaded.html
【讨论】:
使用一组相同的事件处理程序。为了避免多个 eventHandler 作用于单个事件,我使用以下方法。
创建一个大小为系统内核数的线程池
Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // a thread pool to which we can assign tasks
然后创建一个handler数组
HttpEventHandler [] handlers = new HttpEventHandler[Runtime.getRuntime().availableProcessors()];
for(int i = 0; i<Runtime.getRuntime().availableProcessors();i++){
handlers[i] = new HttpEventHandler(i);
}
disruptor.handleEventsWith(handlers);
在事件处理程序中
public void onEvent(HttpEvent event, long sequence, boolean endOfBatch) throws InterruptedException
{
if( sequence % Runtime.getRuntime().availableProcessors()==id){
System.out.println("-----On event Triggered on thread "+Thread.currentThread().getName()+" on sequence "+sequence+" -----");
//your event handler logic
}
【讨论】: