【问题标题】:Using disruptor in the Java Servlet and handling multiple events [closed]在 Java Servlet 中使用中断器并处理多个事件 [关闭]
【发布时间】:2013-08-22 08:22:21
【问题描述】:

我在我的 Web 应用程序中使用 LMAX 中断器,它接受 http 请求参数并将它们处理到环形缓冲区。 3 个事件处理器处理和处理数据,最后一个将其保存到数据库中。 当 servlet 被实例化时,初始化一次 ringbuffer。是这样吗?

public void init() throws ServletException {

    Disruptor<CampaignCode> disruptor = new Disruptor<CampaignCode>(
            CampaignCode.FACTORY, 1024, Executors.newCachedThreadPool());

    EventHandler<CampaignCode> campaignDetailsLoader = new CampaignDetailsLoaderEvent();
    EventHandler<CampaignCode> templateEvent = new TemplateBuildEvent();
    EventHandler<CampaignCode> codeGenerator = new CodeGenerationEventHandler();
    EventHandler<CampaignCode> campaignSaveEventHandler= new CampaignSaveEventHandler();

    disruptor.handleEventsWith(templateEvent, campaignDetailsLoader).then(
            codeGenerator).then(campaignSaveEventHandler);
    this.ringBuffer = disruptor.start();
}

这里我将值直接放入环形缓冲区

    @Override
protected void doPost(HttpServletRequest request,
        HttpServletResponse response) throws ServletException, IOException      {
    String campaignId = request.getParameter("campaignId");
    String campaignType = request.getParameter("campaignType");
    if (campaignId != null && !campaignId.isEmpty()) {
        long sequence = ringBuffer.next();
        CampaignCode campaign = ringBuffer.get(sequence);
        campaign.setCampaignId(Long.parseLong(campaignId));
        campaign.setCampaignType(campaignType);
        ringBuffer.publish(sequence);
    }
  }

事件处理程序

public class CampaignDetailsLoaderEvent implements EventHandler<CampaignCode> {
     @Override
public void onEvent(CampaignCode event, long sequence, boolean endOfBatch)
        throws Exception {
         //load details from db and process
         // udpate values to the event object
  }
 }

  public class TemplateBuildEvent implements EventHandler<CampaignCode> {
     @Override
public void onEvent(CampaignCode event, long sequence, boolean endOfBatch)
        throws Exception {
         // find the template of this type
         // set template to the event object
  }
 }

 public class CodeGenerationEventHandler implements EventHandler<CampaignCode> {
     @Override
public void onEvent(CampaignCode event, long sequence, boolean endOfBatch)
        throws Exception {
         // generate custom dsl code and execute it
         // update the output to the event object 
         //next handler will save it the db
  }
 }

  public class CampaignSaveEventHandler implements EventHandler<CampaignCode> {
     @Override
public void onEvent(CampaignCode event, long sequence, boolean endOfBatch)
        throws Exception {
         // save the details to db
         // done!
  }
 }

这是发布到环形缓冲区的正确方法吗?我需要同步“ringBuffer”对象吗?前 2 个事件并行运行,然后是第 3 个事件。当我有快速的发布者和缓慢的消费者时,我应该如何处理这个问题?我正在使用disruptor 3.1.1,我在web 环境中找不到很好的disruptor 使用示例。一个简单的代码实现,如果你做过,会帮助我理解很多!

【问题讨论】:

    标签: java disruptor-pattern lmax


    【解决方案1】:

    考虑到您所述的代码要求,此实现是正确的。最佳做法是将发布代码包装在 try-finally 块中,以确保始终发布声明的序列:

    long sequence = ringBuffer.next();
      try {
      Event e = ringBuffer.get(sequence);
      // Do some work with the event.
    } finally {
      ringBuffer.publish(sequence);
    }
    

    在构造函数中明确指定您需要一个多生产者 Disruptor 可能也是一个好主意,但这已经在您使用的默认构造函数中完成了。您应该将写入同步到RingBuffer,因为声明和发布序列号的过程已经是线程安全的。但是请注意,不能保证在并发调用 doPost() 时将事件发布到 RingBuffer 的顺序与您的 Web 应用程序接收它们的顺序相同。

    Disruptor 只是一个专门的队列,因此会遇到它们在无限增长时遇到的所有常见问题。如果缓冲区中没有可用的插槽,您对ringBuffer.next() 的调用将阻塞,直到有一个可用。您应该为RingBuffer 提供足够的容量来处理突发流量,并考虑在缓冲区已满的情况下(希望很少见)应用背压的方法。

    在您的特定用例中,如果CodeGenerationCampaignSave 步骤与前两个步骤相比花费了很长时间,并且可以推迟,那么使用额外的 Disruptors/RingBuffers 来排队事件可能是有意义的对于那些处决。

    【讨论】:

    • 这真的很有帮助,还有一个问题。当部署为集群环境时,这是否可以在不更改代码的情况下正常工作?如何集群 Disruptor?
    猜你喜欢
    • 1970-01-01
    • 2017-03-25
    • 1970-01-01
    • 2022-01-23
    • 2013-10-20
    • 2010-10-19
    • 2015-11-18
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多