【问题标题】:How to reuse Spring Kafka code to setup multiple listeners?如何重用 Spring Kafka 代码来设置多个监听器?
【发布时间】:2020-12-18 22:53:12
【问题描述】:

如果我的应用程序有一个侦听器,那么情况就很简单了:

我将配置spring.kafka.*,Spring Boot会将配置解析为KafkaPropeties并初始化所有需要的bean。

但我有两个监听器,假设一个将记录保存到文件,第二个保存到数据库。我希望尽可能多地重用 Spring Boot 约定。

我想配置file.kafka.*db.kafka.*,将这些配置解析为2个KafkaProperties beans:

@ConfigurationProperties("file.kafka")
@Bean(autowireCandidate = false)
KafkaProperties fileKafkaProperties() {
  return new KafkaProperties();
}

@ConfigurationProperties("db.kafka")
@Bean(autowireCandidate = false)
KafkaProperties dbKafkaProperties() {
  return new KafkaProperties();
}

最后分别创建 2 个ConcurrentKafkaListenerContainerFactory: db 和 file。

问题是要将KafkaPropeties 应用于ConcurrentKafkaListenerContainerFactory 我需要ConcurrentKafkaListenerContainerFactoryConfigurer。但是ConcurrentKafkaListenerContainerFactoryConfigurer 不够公开,无法使用KafkaPropeties 进行初始化。

ConcurrentKafkaListenerContainerFactoryConfigurer 是否存在公共访问限制问题或者我有什么问题?

【问题讨论】:

    标签: java spring-boot spring-kafka


    【解决方案1】:

    您可以从KafkaProperties 构建ConsumerFactory

    @Bean
    @ConditionalOnMissingBean(name="kafkaConsumerFactory")
    public ConsumerFactory<String, String> kafkaConsumerFactory(KafkaProperties kafkaProperties) {
    return new DefaultKafkaConsumerFactory<>(
            kafkaProperties.buildConsumerProperties(),
            new StringDeserializer(),
            new StringDeserializer());
    }
    

    使用ConsumerFactory 构建ConcurrentKafkaListenerContainerFactory

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                    kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(kafkaConsumerFactory());
    factory.setConcurrency(3);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
    }
    

    【讨论】:

    • 我完全按照您的说明进行操作,但令人厌恶的是我可以使用 file.kafka.consumer.* 配置 ConsumerFactory 但我无法使用 file.kafka.listener.* 配置 ListenerContainerFactory
    猜你喜欢
    • 2022-01-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-10-03
    • 2019-07-23
    • 2014-09-07
    • 2022-01-12
    • 1970-01-01
    相关资源
    最近更新 更多