【问题标题】:How to trigger and handle these Spring Kafka events如何触发和处理这些 Spring Kafka 事件
【发布时间】:2019-05-07 06:10:58
【问题描述】:

在我有许多 Spring Kafka 消费者的 Spring Boot 项目中,我添加了一些事件侦听器来监控这些消费者的健康状况。代码如下:

@Component
public class ApplicationContextListeningService {

    @EventListener
    public void handleConsumerPausedEvent(ConsumerPausedEvent event) {
        LOGGER_ERROR.warn(WARNING_KAFKA_CONSUMERPAUSEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleConsumerResumedEvent(ConsumerResumedEvent event) {
        LOGGER_ERROR.warn(WARNING_KAFKA_CONSUMERRESUMEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleConsumerStoppedEvent(ConsumerStoppedEvent event) {
        LOGGER_ERROR.error(ERROR_KAFKA_CONSUMERSTOPPEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleListenerContainerIdleEvent(ListenerContainerIdleEvent event) {
        LOGGER_ERROR.error(ERROR_KAFKA_LISTENERCONTAINERIDLEEVENT + event.getListenerId() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleNonResponsiveConsumerEvent(NonResponsiveConsumerEvent event) {
        LOGGER_ERROR.error(ERROR_KAFKA_NONRESPONSIVECONSUMEREVENT + event.getListenerId() + LOG_MSG_DELIMITER + event.toString());
    }

}

有谁知道在什么情况下会抛出这些事件(也许我可以如何手动触发这些事件以进行测试)?对于最后三个事件(ConsumerStoppedEvent、ListenerContainerIdleEvent 和 NonResponsiveConsumerEvent),当我得到其中一个时,是否需要人工干预来解决问题(例如重新启动服务器以再次创建消费者)? 谢谢!

【问题讨论】:

    标签: spring-kafka spring-kafka-test


    【解决方案1】:

    您可以通过将 Mock 消费者工厂注入容器来模拟它们。

    • ConsumerStoppedEvent 在您 stop() 容器时发出。
    • ListenerContainerIdleEvent 只是表示idleEventInterval 中没有收到任何记录,所以通常并不意味着有问题。
    • NonResponsiveConsumerEvent - 很难说;对于较旧的客户端,如果服务器关闭,poll() 将阻塞,因此我们无法发出空闲事件(或执行任何操作)。

    我不知道您是否还能通过更新的客户获得它们;但要模拟它,您只需在模拟使用者 poll() 方法中阻塞足够长的时间,以便监控任务检测到问题并发出事件。

    【讨论】:

    • 谢谢@Gary Russell!我正在运行 Spring Boot 2.1.0 和 Spring Kafka 2.2.0。我一直在低层环境中不时检查我的 Kafka 消费者日志。到目前为止,我只在日志中看到了 NonResponsiveConsumerEvent。我注意到的是,它似乎对消费者没有任何影响,他们毫不犹豫地不断获取。我实现它的方式是三个消费者共享一个执行者。消费者在提交一批消息之前检查执行者的深度。如果深度超过阈值,消费者必须等待......
    • batch的kafka offset只有在batch提交给executor后才会提交。因此,当消费者必须等待较长时间时,最有可能发生这种情况。
    • 首先,你应该尽量保持最新版本;当前boot 2.1版本是2.1.4,当前spring kafka版本是2.2.5。其次,如果你像这样挂起你的监听线程,你需要确保消费者属性max.poll.interval.ms足够大,这样kafka就不会认为消费者已经死了。最后,为了抑制“虚假的”非响应消费者事件,您应该确保pollTimeout * noPollThreshold(容器属性)大于您希望暂停线程的时间。
    • 感谢@Gary Russell 的建议!我会相应地修改配置!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-05-07
    • 2022-01-08
    • 2021-04-07
    • 2011-01-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多