【发布时间】:2021-07-29 04:31:41
【问题描述】:
我正在尝试从我的 SpringBoot 应用程序中启动两个 Kafka 消费者实例。 2 个 Kafka 消费者听 2 个不同的 Kafka Brokers (localhost:9092 and localhost:9093)。两个消费者都具有相同的主题名称和组 ID。但是,只有两个消费者都在收听同一个实例。我做错了什么?
application.yml
spring:
kafka:
consumer:
instance1:
bootstrap-servers: localhost:9092
instance2:
bootstrap-servers: localhost:9093
enable-auto-commit: true
auto-commit-interval: 3000
auto-offset-reset: latest
topic: my-topic
group-id: my-topic-group
KafkaListener.java
public class KafkaListener {
@org.springframework.kafka.annotation.KafkaListener(id = "instance1", groupId = "${spring.kafka.consumer.group-id}", topics = "${spring.kafka.consumer.topic}")
public void consume(String message) {
// some logging here
}
@org.springframework.kafka.annotation.KafkaListener(id = "instance2", groupId = "${spring.kafka.consumer.group-id}", topics = "${spring.kafka.consumer.topic}", containerFactory = "listenerContainer2Factory")
public void consume(ConsumerRecord record) {
// some logging here
}
}
KafkaConfig.java
@Configuration
@Slf4j
public class KafkaConfig {
@Bean
public ConsumerFactory<String, String> container1Factory(KafkaProperties kafkaProperties) {
Map<String, Object> container1ConsumerProperties = kafkaProperties.buildConsumerProperties();
container1ConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
container1ConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(container1ConsumerProperties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> listenerContainer1Factory(
KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(container1Factory(kafkaProperties));
return factory;
}
@Bean
public ConsumerFactory<String, String> container2Factory(KafkaProperties kafkaProperties) {
Map<String, Object> container2ConsumerProperties = kafkaProperties.buildConsumerProperties();
container2ConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
container2ConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(container2ConsumerProperties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> listenerContainer2Factory(
KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(container2Factory(kafkaProperties));
return factory;
}
}
【问题讨论】:
-
请告诉我们,
instance1等配置风格的语法是从哪里得到的?这真的不是 Spring Boot 的自动配置。您的问题是 Spring Boot 没有看到那些嵌套属性,而只是回退到默认属性(对您来说不太可能)是localhost:9092。 -
@ArtemBilan 在 Springboot 中,如果您通过 spring.kafka.consumer 访问,则会自动建议 instance1 和 instance2
-
@ArtemBilan 虽然你的建议是正确的,因为我在控制台中看到 [Consumer clientId=consumer-my-test-group-1, groupId=my-test-group] 被打印了两次
-
@ArtemBilan 对于 KafkaConfig 部分,我遵循了本教程:howtodoinjava.com/kafka/multiple-consumers-example
标签: java apache-kafka spring-kafka