【问题标题】:How to stop micro service with Spring Kafka Listener, when connection to Apache Kafka Server is lost?当与 Apache Kafka 服务器的连接丢失时,如何使用 Spring Kafka Listener 停止微服务?
【发布时间】:2018-04-14 05:06:12
【问题描述】:

我目前正在实现一个微服务,它从 Apache Kafka 主题中读取数据。我为微服务使用“spring-boot,版本:1.5.6.RELEASE”,在同一个微服务中为监听器使用“spring-kafka,版本:1.2.2.RELEASE”。这是我的 kafka 配置:

    @Bean
public Map<String, Object> consumerConfigs() {
    return new HashMap<String, Object>() {{
        put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
        put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
    }};
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

我已经通过@KafkaListener注解实现了监听器:

@KafkaListener(topics = "${kafka.dataSampleTopic}")
public void receive(ConsumerRecord<String, String> payload) {
    //business logic
    latch.countDown();
}

当侦听器失去与 Apache Kafka 服务器的连接时,我需要能够关闭微服务。

当我杀死 kafka 服务器时,我在 spring boot 日志中收到以下消息:

2017-11-01 19:58:15.721  INFO 16800 --- [      0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Marking the coordinator 192.168.0.4:9092 (id: 2145482646 rack: null) dead for group TestGroup

当我启动 kafka sarver 时,我得到:

2017-11-01 20:01:37.748  INFO 16800 --- [      0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator 192.168.0.4:9092 (id: 2145482646 rack: null) for group TestGroup.

很明显,我的微服务中的 Spring Kafka Listener 能够检测 Kafka 服务器何时启动并运行,何时未启动。在 confluent Kafka The Definitive Guide 的书中But How Do We Exit? 章节中,据说需要在消费者上调用wakeup() 方法,以便抛出WakeupException。因此,我尝试使用@EventListener 标记捕获两个事件(Kafka 服务器关闭和Kafka 服务器启动),如Spring for Apache Kafka documentation 中所述,然后调用wakeup()。但是文档中的示例是关于如何检测空闲消费者的,这不是我的情况。有人可以帮我解决这个问题。提前致谢。

【问题讨论】:

    标签: spring spring-boot apache-kafka spring-kafka


    【解决方案1】:

    我不知道如何获得服务器停机情况的通知(根据我的经验,消费者在poll() 中进入了一个紧密循环)。

    但是,如果您发现了这一点,您可以停止侦听器容器,这将唤醒消费者并退出紧密循环...

    @Autowired
    private KafkaListenerEndpointRegistry registry;
    
    ...
    
        this.registry.stop();
    

    2017-11-01 16:29:54.290 信息 21217 --- [广告 | so47062346] o.a.k.c.c.internals.AbstractCoordinator :将协调器 localhost:9092 (id: 2147483647 rack: null) 标记为组 so47062346 已死

    2017-11-01 16:29:54.346 WARN 21217 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient:无法建立到节点 0 的连接。经纪人可能不可用。

    ...

    2017-11-01 16:30:00.643 WARN 21217 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient:无法建立到节点 0 的连接。经纪人可能不可用。

    2017-11-01 16:30:00.680 INFO 21217 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer:消费者停止

    您可以通过添加 reconnect.backoff.ms 来改进紧密循环,但 poll() 永远不会退出,因此我们无法发出空闲事件。

    spring:
      kafka:
        consumer:
          enable-auto-commit: false
          group-id: so47062346
        properties:
          reconnect.backoff.ms: 1000
    

    我想您可以启用空闲事件并使用计时器来检测您是否在一段时间内没有收到任何数据(或空闲事件),然后停止容器。

    【讨论】:

    • 有一些讨论here。我刚刚添加了一条评论以添加一些检测机制。
    • 我也将此问题添加到spring-kafka
    • 我实现了@EventListener({NonResponsiveConsumerEvent.class}) 并且它有效。谢谢! :)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-09-23
    • 1970-01-01
    • 2021-12-20
    • 2018-12-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多