【发布时间】:2022-10-24 21:51:15
【问题描述】:
我的任务是在带有@KafkaListner 注释的方法之前获取Kafka 消息,检查其中的correlationId 和requestId 标头。如果它们存在,请将它们刷新到 MDC 或以其他方式生成它们。
我的问题是如何在使用@KafkaListner 的方法之前获取带有标头的Kafka 消息?
【问题讨论】:
-
你能用那种方法做到吗?
标签: java spring apache-kafka spring-kafka
我的任务是在带有@KafkaListner 注释的方法之前获取Kafka 消息,检查其中的correlationId 和requestId 标头。如果它们存在,请将它们刷新到 MDC 或以其他方式生成它们。
我的问题是如何在使用@KafkaListner 的方法之前获取带有标头的Kafka 消息?
【问题讨论】:
标签: java spring apache-kafka spring-kafka
您可以尝试按照here 的说明编写自己的ConsumerInterceptor。
Apache Kafka 提供了一种向生产者和消费者添加拦截器的机制。这些对象由 Kafka 管理,而不是 Spring,因此普通的 Spring 依赖注入将无法用于依赖 Spring Bean 的连接。但是,您可以使用拦截器 config() 方法手动连接这些依赖项。下面的 Spring Boot 应用程序展示了如何通过覆盖 boot 的默认工厂以将一些依赖 bean 添加到配置属性中来做到这一点。
ConsumerFactory 定义:
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(SomeBean someBean) {
Map<String, Object> consumerProperties = new HashMap<>();
// consumerProperties.put(..., ...)
// ...
consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
consumerProperties.put("some.bean", someBean);
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}
拦截器定义:
public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private SomeBean bean;
@Override
public void configure(Map<String, ?> configs) {
this.bean = (SomeBean) configs.get("some.bean");
}
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
this.bean.someMethod("consumer interceptor");
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
}
【讨论】:
RecordInterceptor 添加到侦听器容器工厂。它将一次获得一个记录。
将RecordInterceptor 添加到侦听器容器(或创建它的工厂)。
【讨论】: