【问题标题】:How do I get a Kafka message before the @KafkaListener method?如何在 @KafkaListener 方法之前获取 Kafka 消息?
【发布时间】:2022-10-24 21:51:15
【问题描述】:

我的任务是在带有@KafkaListner 注释的方法之前获取Kafka 消息,检查其中的correlationId 和requestId 标头。如果它们存在,请将它们刷新到 MDC 或以其他方式生成它们。

我的问题是如何在使用@KafkaListner 的方法之前获取带有标头的Kafka 消息?

【问题讨论】:

  • 你能用那种方法做到吗?

标签: java spring apache-kafka spring-kafka


【解决方案1】:

您可以尝试按照here 的说明编写自己的ConsumerInterceptor
Apache Kafka 提供了一种向生产者和消费者添加拦截器的机制。这些对象由 Kafka 管理,而不是 Spring,因此普通的 Spring 依赖注入将无法用于依赖 Spring Bean 的连接。但是,您可以使用拦截器 config() 方法手动连接这些依赖项。下面的 Spring Boot 应用程序展示了如何通过覆盖 boot 的默认工厂以将一些依赖 bean 添加到配置属性中来做到这一点。

ConsumerFactory 定义:

    @Bean
    public ConsumerFactory<?, ?> kafkaConsumerFactory(SomeBean someBean) {
        Map<String, Object> consumerProperties = new HashMap<>();
        // consumerProperties.put(..., ...)
        // ...
        consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
        consumerProperties.put("some.bean", someBean);
        return new DefaultKafkaConsumerFactory<>(consumerProperties);
    }

拦截器定义:

public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {

    private SomeBean bean;

    @Override
    public void configure(Map<String, ?> configs) {
        this.bean = (SomeBean) configs.get("some.bean");
    }

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        this.bean.someMethod("consumer interceptor");
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }

    @Override
    public void close() {
    }

}

【讨论】:

  • 如果您想通过 Spring 完成所有操作,还可以将 RecordInterceptor 添加到侦听器容器工厂。它将一次获得一个记录。
【解决方案2】:

RecordInterceptor 添加到侦听器容器(或创建它的工厂)。

【讨论】:

    猜你喜欢
    • 2021-10-26
    • 2019-10-28
    • 1970-01-01
    • 1970-01-01
    • 2021-09-20
    • 1970-01-01
    • 2020-06-19
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多