【发布时间】:2021-10-19 05:42:41
【问题描述】:
我正在尝试在我的 spring kafka 监听器中访问各种 KafkaHeaders 属性:
这是我获取 KafkaListenerContainerFactory 的代码 :
public KafkaListenerContainerFactory<ConcurrentMessageListener<String,GenericRecord>> kafkaListenerContainerFactory(final KafkaProperties kafkaProperties){
final ConcurrentKafkaListenerContainerFactory<String,GenericRecord> factory=new ConcurrentKafkaListenerContainerFactory<>();
factory.setBatchErrorHandler(new BatchLoggingErrorHandler());
factory.setBatchListener(true);
return factory;
}
我的监听代码:
@KafkaListener(topic="testTopic",groupId="testGroupId"){
public void consumeNotification(@Payload Request request, @Header(KafkaHeaders.RECORD_METADATA) RecordMetadata recordMetaData){
}
But I am getting exception in my Listener:
Missing header 'kafka_recordMetadata' for method parameterType[class org.apache.kafka.clients.producer.RecordMetadata]
对于所有标头属性,我都收到此错误...与生产者有关吗?
KafkaHeaders 中的所有属性都给我带来了问题。一切都在抛出 MissingHeader 错误。
【问题讨论】:
-
如果不使用批处理监听选项,它是如何工作的?
-
@Artem Bilan,我也试过了..将错误处理程序设置为 seektoerrorHandler。并且批处理侦听器为假..然后也是同样的错误。
标签: java spring apache-kafka spring-kafka