【问题标题】:How to load Kafka Consumer lazily in Spring boot?如何在 Spring Boot 中懒加载 Kafka Consumer?
【发布时间】:2020-11-30 23:59:02
【问题描述】:

我想通过命令行参数提供组 ID,但是当我尝试这样做时出现以下错误。

Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.

这意味着在加载 kafkalistener 时它需要 group_id。如果我在 consumerConfig 文件中提供了 groupId,那么它可以正常工作。

那么有什么方法可以让我通过命令行提供组 ID 并延迟加载 kafka 侦听器,这样我在程序启动时就不需要了。

我的消费者配置:

@Configuration
class KafkaConsumerConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Autowired
    private ArgumentModel argumentModel;

    private Logger logger = LoggerFactory.getLogger(KafkaConsumerConfig.class);

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        logger.info("bootstrapServers : {}", bootstrapServers);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, argumentModel.getKafkaGroupId());
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

【问题讨论】:

  • 什么是 ArgumentModel?是您的自定义代码吗?如果是这样,为什么不使用可以注入consumerConfigs bean的ConfigurationProperties?
  • 在那种情况下它的工作,但我想通过命令行。因此,当程序启动时,我读取 kafkaGroupId 的值并将其保存到模型中,然后在这里使用它。但是当我启动spring程序时,它直接给了我错误。那么有什么办法可以让我懒惰地加载我的听众,这样我就不会给我错误了。
  • 通过命令行传递东西并不意味着你不能使用配置属性和一般spring的配置属性机制。你可以这样运行:java -jar myapp.jar --kafka.groupId=someGroupHere
  • groupIdKafkaListener 属性中使用属性占位符。

标签: spring spring-boot apache-kafka kafka-consumer-api spring-kafka


【解决方案1】:
@KafkaListener(... groupId = "${group.id}")

然后在命令行传递-Dgroup.id=myGroup

【讨论】:

  • 它解决了我的组 ID 问题。如果我想从 cmd 提供消费者主题名称,那么我应该使用:@KafkaListener(topics = "${consumer_topic}", groupId = "${group.id}") 并传递 -Dconsumer_topic=topic_name 。这是正确的吗?
  • 如何从命令行传递 kafka.bootstrap-servers 值?现在我正在通过 application.properties。
  • 您可以在命令行上使用--(两个破折号)传递任何启动属性 - 例如java -jar target/ktest26-0.0.1-SNAPSHOT.jar --spring.kafka.bootstrap-servers=localhost:9092
猜你喜欢
  • 2020-11-09
  • 2019-09-01
  • 1970-01-01
  • 2020-04-24
  • 2020-07-10
  • 1970-01-01
  • 2018-03-17
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多