【问题标题】:Problems adding multiple KafkaListenerContainerFactories添加多个 KafkaListenerContainerFactories 的问题
【发布时间】:2017-08-25 19:41:52
【问题描述】:

您好,我目前正在涉足 Spring Kafka,并成功地将单个 KafkaListenerContainerFactory 添加到我的侦听器中。现在我想添加多个 KafkaListenerContainerFactory(一个用于将在 json 中包含消息的主题,另一个用于字符串)。见以下代码:

@EnableKafka
@Configuration
public class KafkaConsumersConfig {

    private final KafkaConfiguration kafkaConfiguration;

    @Autowired
    public KafkaConsumersConfig(KafkaConfiguration kafkaConfiguration) {
        this.kafkaConfiguration = kafkaConfiguration;
    }

    @Bean
    public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String,Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(jsonConsumerFactory());
        factory.setConcurrency(3);
        factory.setAutoStartup(true);
        return factory;
    }

    @Bean
    public ConsumerFactory<String,Record> jsonConsumerFactory(){
        JsonDeserializer<Record> jsonDeserializer = new JsonDeserializer<>(Record.class);
        return new DefaultKafkaConsumerFactory<>(jsonConsumerConfigs(),new StringDeserializer(), jsonDeserializer);
    }

    @Bean
    public Map<String,Object> jsonConsumerConfigs(){
        Map<String,Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,  kafkaConfiguration.getBrokerAddress());
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getJsonGroupId());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval());
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout());
        return propsMap;
    }
    @Bean
    public KafkaListenerContainerFactory<?> kafkaFileListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(fileConsumerFactory());
        factory.setConcurrency(3);
        factory.setAutoStartup(true);
        return factory;
    }

    @Bean
    public ConsumerFactory<String,String> fileConsumerFactory(){
        return new DefaultKafkaConsumerFactory<>(fileConsumerConfigs());
    }

    @Bean
    public Map<String,Object> fileConsumerConfigs(){
        Map<String,Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,  kafkaConfiguration.getBrokerAddress());
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getFileGroupId());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval());
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout());
        return propsMap;
    }
}

运行它会给我以下错误:

Description:

Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
    - Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found beans 'jsonConsumerFactory', 'fileConsumerFactory'


Action:

Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.

我做错了什么?

【问题讨论】:

    标签: spring spring-boot apache-kafka spring-kafka


    【解决方案1】:

    看起来您不会依赖 Spring Boot 的 Kafka Auto Configuration

    Spring Boot 中提供了KafkaAutoConfiguration:

    @Bean
    @ConditionalOnMissingBean(ConsumerFactory.class)
    public ConsumerFactory<?, ?> kafkaConsumerFactory() {
    

    由于您有 jsonConsumerFactoryfileConsumerFactory,它们会覆盖自动配置提供的那个。

    但另一方面,在KafkaAnnotationDrivenConfiguration,你的工厂都不能申请:

    @Bean
    @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {
    

    因为您的 ConsumerFactory bean 不是 ConsumerFactory&lt;Object, Object&gt; 类型。

    所以:

    • 只需将以下内容添加到应用程序属性文件中,即可将 KafkaAutoConfiguration 从 Spring Boot 自动配置中排除: spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
    • 或将您的 KafkaListenerContainerFactory bean 之一重命名为 kafkaListenerContainerFactory 以在引导中覆盖它
    • 或将ConsumerFactory bean 之一设为ConsumerFactory&lt;Object, Object&gt; 类型。

    【讨论】:

      【解决方案2】:

      我已经在下面的代码中实现了它,它对我来说工作正常。

      // LISTENER 1
      @Bean
      @ConditionalOnMissingBean(name = "yourListenerFactory1")
      public ConsumerFactory<String, YourCustomObject1> yourConsumerFactory1() {
         Map<String, Object> props = new HashMap<>();
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(ConsumerConfig.GROUP_ID_CONFIG, "YOUR-GROUP-1");
         return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
           new JsonDeserializer<>(YourCustomObject1.class));
      }
      
      
      @Bean(name = "yourListenerFactory1")
      public ConcurrentKafkaListenerContainerFactory<String, YourCustomObject1> 
        yourListenerFactory1() {
         ConcurrentKafkaListenerContainerFactory<String, YourCustomObject1> factory =
             new ConcurrentKafkaListenerContainerFactory<>();
         factory.setConsumerFactory(yourConsumerFactory1());
         ContainerProperties containerProperties = factory.getContainerProperties();
         containerProperties.setPollTimeout(...);
         containerProperties.setAckMode(AckMode...);
         return factory;
      }
      
      
      // LISTENER 2
      @Bean
      @ConditionalOnMissingBean(name = "yourListenerFactory2")
      public ConsumerFactory<String, YourCustomObject2> yourConsumerFactory2() {
         Map<String, Object> props = new HashMap<>();
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(ConsumerConfig.GROUP_ID_CONFIG, "YOUR-GROUP-2");
         return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
            new JsonDeserializer<>(YourCustomObject2.class));
      }
      
      
      @Bean(name = "yourListenerFactory2")
      public ConcurrentKafkaListenerContainerFactory<String, YourCustomObject2> 
         yourListenerFactory2() {
          ConcurrentKafkaListenerContainerFactory<String, YourCustomObject2> factory 
               =  new ConcurrentKafkaListenerContainerFactory<>();
          factory.setConsumerFactory(yourConsumerFactory2());
          ContainerProperties containerProperties = factory.getContainerProperties();
          containerProperties.setPollTimeout(...);
          containerProperties.setAckMode(AckMode...);
          return factory;
       }
      

      另外,我已将 spring.autoconfigure.exclude 属性设置为 必须 spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration

      这是我的消费者配置

      消费者 1

      @KafkaListener(id = "your-cousumer-1",
        topicPattern = "your-topic-1",
        containerFactory = "yourListenerFactory1")
       public void consumer1(YourCustomObject1 data,
                             Acknowledgment acknowledgment,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
            @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
            @Header(KafkaHeaders.OFFSET) List<Long> offsets) throws Exception { ... }
      

      消费者 2

        @KafkaListener(id = "your-cousumer-2",
                       topicPattern = "your-topic-2",
                       containerFactory = "yourListenerFactory2")
        public void consumer2(YourCustomObject2 data,
                              Acknowledgment acknowledgment,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
            @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
            @Header(KafkaHeaders.OFFSET) List<Long> offsets) throws Exception { ...  }
      

      另外,我的 kafka 模板是

      @Autowired
      KafkaTemplate<String, Object> kafkaTemplate;
      

      【讨论】:

      • 如果您对不同的主题使用相同的组 ID,这会起作用吗?
      • 我没试过。我已将其用于同一组的多个主题。它有效。
      • 我认为这只是因为您将KafkaAutoConfiguration 设置为关闭了吗?您对 @ConditionalOnMissingBean(name = "yourListenerFactory1") 所做的事情似乎没有多大意义,因为您只有在您的 ConcurrentKafkaListenerContainerFactory 不存在时才启用 ConsumerFactory。
      【解决方案3】:

      您可以在KafkaListener定义中定义每个容器工厂如下:

      @KafkaListener(topics = "fileTopic", containerFactory = "kafkaFileListenerContainerFactory")
      public void fileConsumer(...) {...}
      
      @KafkaListener(topics = "jsonTopic", containerFactory = "kafkaJsonListenerContainerFactory")
      public void jsonConsumer(...) {...}
      

      【讨论】:

      • 你不应该在那里传递一个 KafkaListenerContainerFactory 而不是 ConsumerFactory 吗?不要认为这行得通
      • @Rodrigo 你是对的。我写错了。我已经编辑了我的答案。谢谢...
      猜你喜欢
      • 1970-01-01
      • 2021-10-01
      • 2018-07-21
      • 2021-12-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多