【问题标题】:Solution to slow consumer(eventProcessor) issue in LMAX Disruptor patternLMAX Disruptor模式中消费者(eventProcessor)慢问题的解决方案
【发布时间】:2012-07-23 03:25:14
【问题描述】:

在使用中断器时,可能会有一个消费者落后,并且由于消费者速度慢,整个应用程序都会受到影响。

请记住,每个生产者(发布者)和消费者(事件处理器)都在一个线程上运行,有什么办法可以解决消费者缓慢的问题?

我们可以在单个消费者上使用多个线程吗?如果没有,有什么更好的选择?

【问题讨论】:

    标签: consumer disruptor-pattern


    【解决方案1】:

    一般来说,使用 WorkerPool 允许多个池化工作线程在单个使用者上工作,如果您有独立且可能持续时间可变的任务(例如:一些短任务,一些更长的任务),这是很好的。

    另一种选择是让多个独立的工作人员并行处理事件,但每个工作人员只处理模数 N 个工作人员(例如,2 个线程,一个线程处理奇数,一个线程处理偶数事件 ID)。如果您有一致的持续时间处理任务,这将非常有用,并且允许批处理也非常有效地工作。

    要考虑的另一件事是消费者可以进行“批处理”,这在例如审计中特别有用。如果您的消费者有 10 个事件等待,而不是单独将 10 个事件写入审计日志,您可以收集所有 10 个事件并同时写入它们。根据我的经验,这不仅仅涵盖了运行多个线程的需要。

    【讨论】:

    • 关于让一组 (N) 个并行消费者 (EventProcessor) 进行批处理的选项。您必须注意的一件事(至少对于 2.10.x 版本——我还没有深入研究 3.x)是,当您将工作分配给 N 个消费者时,您不能依赖每个消费者正在处理的 endOfBatch;只有在处理 endOfBatch 恰好为真的序列的 ONE 消费者集上才会注意到它(除非您作弊并在所有序列上查找 endOfBatch)。如果事件有任何停顿,其他消费者可能在很长一段时间内都看不到 endOfBatch=true。
    • 在当前代码行中,WorkHandlers 无权访问 endOfBatch 标志(即:池消费者甚至无法使用它)。
    • 但是如果事件必须按顺序处理呢?我可能在这里遗漏了一些东西,但看起来你建议的内容只有在你可以并行处理事件时才适用。
    • 顺序处理是关于“何时”和“关系”的。如果您可以将问题拆分为与任何其他工作没有依赖“关系”的“工作”,并“提交”以使该工作永久化,那么您可以安排您的 EventProcessors 以便您通过几个进行 mod-N 处理并发事件处理器,然后您使用单个“提交”事件处理器跟进这些事件处理器。单次提交仍然是一个瓶颈,所以这只有在“工作”比“提交”更昂贵的情况下才有效。假设在“提交”事件处理器中进行批处理,这很容易确保很多时间。
    【解决方案2】:

    尝试将慢速部分分离到其他线程(I/O,而不是 O(1) 或 O(log) 计算等),或者在消费者超载时施加某种背压(通过让步或临时停车)生产者,以 503 或 429 状态码重播等): http://mechanical-sympathy.blogspot.com/2012/05/apply-back-pressure-when-overloaded.html

    【讨论】:

      【解决方案3】:

      使用一组相同的事件处理程序。为了避免多个 eventHandler 作用于单个事件,我使用以下方法。

      创建一个大小为系统内核数的线程池

      Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // a thread pool to which we can assign tasks
      

      然后创建一个handler数组

       HttpEventHandler [] handlers = new HttpEventHandler[Runtime.getRuntime().availableProcessors()];
      
      for(int i = 0; i<Runtime.getRuntime().availableProcessors();i++){
          handlers[i] = new HttpEventHandler(i);
      }
      
      disruptor.handleEventsWith(handlers);
      

      在事件处理程序中

      public void onEvent(HttpEvent event, long sequence, boolean endOfBatch) throws InterruptedException
      {
          if( sequence % Runtime.getRuntime().availableProcessors()==id){
      
              System.out.println("-----On event Triggered on thread "+Thread.currentThread().getName()+" on sequence "+sequence+" -----");
              //your event handler logic
      }
      

      【讨论】:

        猜你喜欢
        • 2013-03-20
        • 1970-01-01
        • 2016-08-08
        • 1970-01-01
        • 1970-01-01
        • 2014-08-12
        • 1970-01-01
        • 2017-09-08
        • 1970-01-01
        相关资源
        最近更新 更多