【问题标题】:Why 2 Kafka consumers in Springboot are referring to the same Kafka Broker?为什么 Springboot 中的 2 个 Kafka 消费者指的是同一个 Kafka Broker?
【发布时间】:2021-07-29 04:31:41
【问题描述】:

我正在尝试从我的 SpringBoot 应用程序中启动两个 Kafka 消费者实例。 2 个 Kafka 消费者听 2 个不同的 Kafka Brokers (localhost:9092 and localhost:9093)。两个消费者都具有相同的主题名称和组 ID。但是,只有两个消费者都在收听同一个实例。我做错了什么?

application.yml

spring:
  kafka:
    consumer:
      instance1:
        bootstrap-servers: localhost:9092
      instance2:
        bootstrap-servers: localhost:9093
      enable-auto-commit: true
      auto-commit-interval: 3000
      auto-offset-reset: latest
      topic: my-topic
      group-id: my-topic-group

KafkaListener.java

public class KafkaListener {
@org.springframework.kafka.annotation.KafkaListener(id = "instance1", groupId = "${spring.kafka.consumer.group-id}", topics = "${spring.kafka.consumer.topic}")
public void consume(String message) {
    // some logging here
}

@org.springframework.kafka.annotation.KafkaListener(id = "instance2", groupId = "${spring.kafka.consumer.group-id}", topics = "${spring.kafka.consumer.topic}", containerFactory = "listenerContainer2Factory")
public void consume(ConsumerRecord record) {
    // some logging here
}

}

KafkaConfig.java

@Configuration
@Slf4j
public class KafkaConfig {

    @Bean
    public ConsumerFactory<String, String> container1Factory(KafkaProperties kafkaProperties) {
        Map<String, Object> container1ConsumerProperties = kafkaProperties.buildConsumerProperties();
        container1ConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        container1ConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(container1ConsumerProperties);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> listenerContainer1Factory(
            KafkaProperties kafkaProperties) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(container1Factory(kafkaProperties));
        return factory;
    }
    
    @Bean
    public ConsumerFactory<String, String> container2Factory(KafkaProperties kafkaProperties) {
        Map<String, Object> container2ConsumerProperties = kafkaProperties.buildConsumerProperties();
        container2ConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        container2ConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(container2ConsumerProperties);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> listenerContainer2Factory(
            KafkaProperties kafkaProperties) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(container2Factory(kafkaProperties));
        return factory;
    }
}

【问题讨论】:

  • 请告诉我们,instance1 等配置风格的语法是从哪里得到的?这真的不是 Spring Boot 的自动配置。您的问题是 Spring Boot 没有看到那些嵌套属性,而只是回退到默认属性(对您来说不太可能)是 localhost:9092
  • @ArtemBilan 在 Springboot 中,如果您通过 spring.kafka.consumer 访问,则会自动建议 instance1 和 instance2
  • @ArtemBilan 虽然你的建议是正确的,因为我在控制台中看到 [Consumer clientId=consumer-my-test-group-1, groupId=my-test-group] 被打印了两次
  • @ArtemBilan 对于 KafkaConfig 部分,我遵循了本教程:howtodoinjava.com/kafka/multiple-consumers-example

标签: java apache-kafka spring-kafka


【解决方案1】:

这个答案给了我关于哪里出错的线索。 No; Boot will only auto-configure one set of infrastructure; if you need multiple, you need to define them as beans.

我在 bean 初始化中添加了引导细节如下:

container1ConsumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

对于容器 2:

container2ConsumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-01-06
    • 2020-06-23
    • 1970-01-01
    • 1970-01-01
    • 2019-03-17
    • 2023-03-20
    • 1970-01-01
    相关资源
    最近更新 更多