【发布时间】:2018-04-14 05:06:12
【问题描述】:
我目前正在实现一个微服务,它从 Apache Kafka 主题中读取数据。我为微服务使用“spring-boot,版本:1.5.6.RELEASE”,在同一个微服务中为监听器使用“spring-kafka,版本:1.2.2.RELEASE”。这是我的 kafka 配置:
@Bean
public Map<String, Object> consumerConfigs() {
return new HashMap<String, Object>() {{
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
}};
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
我已经通过@KafkaListener注解实现了监听器:
@KafkaListener(topics = "${kafka.dataSampleTopic}")
public void receive(ConsumerRecord<String, String> payload) {
//business logic
latch.countDown();
}
当侦听器失去与 Apache Kafka 服务器的连接时,我需要能够关闭微服务。
当我杀死 kafka 服务器时,我在 spring boot 日志中收到以下消息:
2017-11-01 19:58:15.721 INFO 16800 --- [ 0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator 192.168.0.4:9092 (id: 2145482646 rack: null) dead for group TestGroup
当我启动 kafka sarver 时,我得到:
2017-11-01 20:01:37.748 INFO 16800 --- [ 0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator 192.168.0.4:9092 (id: 2145482646 rack: null) for group TestGroup.
很明显,我的微服务中的 Spring Kafka Listener 能够检测 Kafka 服务器何时启动并运行,何时未启动。在 confluent Kafka The Definitive Guide 的书中But How Do We Exit? 章节中,据说需要在消费者上调用wakeup() 方法,以便抛出WakeupException。因此,我尝试使用@EventListener 标记捕获两个事件(Kafka 服务器关闭和Kafka 服务器启动),如Spring for Apache Kafka documentation 中所述,然后调用wakeup()。但是文档中的示例是关于如何检测空闲消费者的,这不是我的情况。有人可以帮我解决这个问题。提前致谢。
【问题讨论】:
标签: spring spring-boot apache-kafka spring-kafka