【问题标题】:Hooks in Kafka ListenerKafka Listener 中的钩子
【发布时间】:2020-01-22 07:09:35
【问题描述】:

在 kafka 收听消息之前/之后是否有任何可用的钩子?

用例: 必须设置 MDC 关联 ID 以执行日志可追溯性

我在寻找什么? 一个 before/after 回调方法,以便可以在进入时设置 MDC 关联 ID,并最终在退出时清除 MDC。

修改后的场景: 我正在获取作为 Kafka Headers 的一部分的共同关系 ID,并且我想在 Kafka Listener 中收到消息后立即在 MDC 中设置相同的 ID

感谢帮助

【问题讨论】:

    标签: spring apache-kafka slf4j spring-kafka mdc


    【解决方案1】:

    您可以向您的侦听器 bean 添加环绕建议...

    @SpringBootApplication
    public class So59854374Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So59854374Application.class, args);
        }
    
        @Bean
        public static BeanPostProcessor bpp() { // static is important
            return new BeanPostProcessor() {
    
                @Override
                public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
                    if (bean instanceof MyListener) {
                        ProxyFactoryBean pfb = new ProxyFactoryBean();
                        pfb.setTarget(bean);
                        pfb.addAdvice(new MethodInterceptor() {
    
                            @Override
                            public Object invoke(MethodInvocation invocation) throws Throwable {
                                try {
                                    System.out.println("Before");
                                    return invocation.proceed();
                                }
                                finally {
                                    System.out.println("After");
                                }
                            }
    
                        });
                        return pfb.getObject();
                    }
                    return bean;
                }
    
            };
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so59854374").partitions(1).replicas(1).build();
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, String> template) {
            return args -> template.send("so59854374", "foo");
        }
    
    }
    
    @Component
    class MyListener {
    
        @KafkaListener(id = "so59854374", topics = "so59854374")
        public void listen(String in) {
            System.out.println(in);
        }
    
    }
    

    Before
    foo
    After
    

    编辑

    如果您将 @Header("myMdcHeader") byte[] mdc 作为附加参数添加到您的 kafka 侦听器方法,则可以在调用时使用 getArguments()[1]

    另一种解决方案是将RecordInterceptor 添加到侦听器容器工厂,它允许您在将原始ConsumerRecord 传递给侦听器适配器之前访问它。

    /**
     * An interceptor for {@link ConsumerRecord} invoked by the listener
     * container before invoking the listener.
     *
     * @param <K> the key type.
     * @param <V> the value type.
     *
     * @author Gary Russell
     * @since 2.2.7
     *
     */
    @FunctionalInterface
    public interface RecordInterceptor<K, V> {
    
        /**
         * Perform some action on the record or return a different one.
         * If null is returned the record will be skipped.
         * @param record the record.
         * @return the record or null.
         */
        @Nullable
        ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record);
    
    }
    
    /**
     * Set an interceptor to be called before calling the listener.
     * Does not apply to batch listeners.
     * @param recordInterceptor the interceptor.
     * @since 2.2.7
     */
    public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
        this.recordInterceptor = recordInterceptor;
    }
    

    如果您使用的是批处理侦听器,Kafka 会提供ConsumerInterceptor

    【讨论】:

    • 这里的问题是,我们无法设置运行时关联ID。这是错误
    • 不清楚你的意思;请通过编辑问题来澄清。方法参数在invocation 中可用。
    猜你喜欢
    • 1970-01-01
    • 2018-10-26
    • 1970-01-01
    • 1970-01-01
    • 2020-04-24
    • 2021-12-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多