【问题标题】:Disruptor - EventHandlers not invokedDisruptor - 未调用事件处理程序
【发布时间】:2011-11-29 11:10:22
【问题描述】:

我正在使用Disruptor 框架,发现我的事件处理程序没有被调用。

这是我的设置代码:

private static final int BUFFER_SIZE = 1024 * 8;
private final ExecutorService  EXECUTOR = Executors.newSingleThreadExecutor();

private void initializeDisruptor() {
    if (disruptor != null)
        return;

    disruptor = 
            new Disruptor<TwitterStatusReceivedEvent>(TwitterStatusReceivedEvent.EVENT_FACTORY, EXECUTOR,
                    new SingleThreadedClaimStrategy(BUFFER_SIZE),
                    new SleepingWaitStrategy());
    disruptor.handleEventsWith(searchTermMatchingHandler)
        .then(appendStatusHandler,updatePriceHandler).then(persistUpdatesHandler);

    this.ringBuffer = disruptor.start();
}

在其他地方,我发布活动。我已经尝试了以下两种方法:

事件发布方式A:

private void handleStatus(final Status status)
{

    long sequence = ringBuffer.next();
    TwitterStatusReceivedEvent event = ringBuffer.get(sequence);
    event.setStatus(status);
    event.setSearchInstruments(searchInstruments);
    ringBuffer.publish(sequence);
}

在这种情况下,我发现第一个 EventHandler 被调用,但除此之外再也没有。

事件发布方式B:

private void handleStatus(final Status status)
{
    disruptor.publishEvent(new EventTranslator<TwitterStatusReceivedEvent>() {

        @Override
        public TwitterStatusReceivedEvent translateTo(
                TwitterStatusReceivedEvent event, long sequence) {
            event.setStatus(status);
            event.setSearchInstruments(searchInstruments);
            return event;
        }
    });
}

在这种情况下,我发现根本没有调用任何事件处理程序。

我做错了什么?

更新

这是我的 EventHandler 的全部内容。我应该如何发出处理完成的信号?

public class SearchTermMatchingEventHandler implements EventHandler<TwitterStatusReceivedEvent> {

    @Override
    public void onEvent(TwitterStatusReceivedEvent event, long sequence,
            boolean endOfBatch) throws Exception {
        String statusText = event.getStatus().getText();
        for (Instrument instrument : event.getSearchInstruments())
        {
            if (statusText.contains(instrument.getSearchTerm()))
            {
                event.setMatchedInstrument(instrument);
                break;
            }
        }
    }

}

【问题讨论】:

    标签: java disruptor-pattern


    【解决方案1】:

    每个事件处理程序都需要在其自己的线程中运行,该线程在您关闭中断程序之前不会退出。由于您使用的是单线程执行程序,因此只有恰好执行的第一个事件处理程序才会运行。 (Disruptor 类将每个处理程序存储在一个 hashmap 中,因此最终运行的处理程序会有所不同)

    如果您切换到缓存线程池,您应该会发现它开始运行。您不需要对序列号进行任何管理,因为这一切都由 Disruptor 类为您设置和管理的 EventProcessor 处理。只处理你得到的每个事件是完全正确的。

    【讨论】:

    • 如果序列号由 Disruptor 类管理,请问为什么序列号是 onEvent() 的输入?谢谢。
    • 序列号是一个非常有用的信息。例如,LMAX 通过 JMX 公开每个消费者的当前序列号,这有助于我们监控系统是否按预期运行、消费者落后多远等。我们还使用该序列号作为消息的 ID 以支持可靠消息传递和其他各种功能。
    【解决方案2】:

    您需要确保您的 searchTermMatchingHandler 在处理事件后更新其序列号。更下游的事件处理程序(appendStatusHandler、updatePriceHandler、persistUpdatesHandler)将检查 searchTermMatchingHandler 序列号,以查看它们可以从环形缓冲区中提取哪些事件。

    【讨论】:

    • 感谢特丽莎的回复。我不确定我是否理解。我已经更新了我的问题以显示我的 EventHandler,但是我没有在 EventHandler 界面中看到我应该如何向未来的处理程序发出工作已完成的信号? EventHandler 接口没有暴露序列号?
    • Marty,Adrian 是对的,你不必担心这个,因为 Disruptor 类会为你处理这个问题,很抱歉造成混乱。
    【解决方案3】:

    我遇到了同样的问题,但这是因为我使用 Spring(Java 配置)实例化了 Disruptor,并且在与 Disruptor 相同的 @Bean 方法中实例化了 Executor。

    我通过在单独的 @Bean 方法中实例化 Executor 解决了这个问题。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-11-24
      • 2023-03-11
      • 2015-12-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多