【发布时间】:2020-11-30 23:59:02
【问题描述】:
我想通过命令行参数提供组 ID,但是当我尝试这样做时出现以下错误。
Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.
这意味着在加载 kafkalistener 时它需要 group_id。如果我在 consumerConfig 文件中提供了 groupId,那么它可以正常工作。
那么有什么方法可以让我通过命令行提供组 ID 并延迟加载 kafka 侦听器,这样我在程序启动时就不需要了。
我的消费者配置:
@Configuration
class KafkaConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Autowired
private ArgumentModel argumentModel;
private Logger logger = LoggerFactory.getLogger(KafkaConsumerConfig.class);
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
logger.info("bootstrapServers : {}", bootstrapServers);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, argumentModel.getKafkaGroupId());
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
【问题讨论】:
-
什么是 ArgumentModel?是您的自定义代码吗?如果是这样,为什么不使用可以注入consumerConfigs bean的ConfigurationProperties?
-
在那种情况下它的工作,但我想通过命令行。因此,当程序启动时,我读取 kafkaGroupId 的值并将其保存到模型中,然后在这里使用它。但是当我启动spring程序时,它直接给了我错误。那么有什么办法可以让我懒惰地加载我的听众,这样我就不会给我错误了。
-
通过命令行传递东西并不意味着你不能使用配置属性和一般spring的配置属性机制。你可以这样运行:
java -jar myapp.jar --kafka.groupId=someGroupHere -
在
groupIdKafkaListener 属性中使用属性占位符。
标签: spring spring-boot apache-kafka kafka-consumer-api spring-kafka