【问题标题】:Filter messages before deserialization based on headers根据标头在反序列化之前过滤消息
【发布时间】:2019-05-19 11:29:41
【问题描述】:

有时可以根据标头值在反序列化之前过滤掉消息。使用spring kafka的这个场景是否有任何现有模式。我正在考虑实现类似于 ErrorHandlingDeserializer 除了委托将过滤谓词也作为属性。有什么建议?谢谢。

【问题讨论】:

    标签: spring-boot spring-kafka


    【解决方案1】:

    是的,您可以使用 ErrorHandlingDeserializer 使用的相同技术来返回“标记”对象而不是进行反序列化,然后添加一个 RecordFilterStrategy 过滤具有此类对象的记录,到侦听器(容器工厂当使用@KafkaListener 或为显式侦听器使用过滤适配器时)。

    编辑

    Spring Boot 和添加过滤器...

        @Bean
        public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
                ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
                ConsumerFactory<Object, Object> kafkaConsumerFactory) {
            ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
            configurer.configure(factory, kafkaConsumerFactory);
            kafkaConsumerFactory.setRecordFilterStrategy(myFilter());
            return factory;
        }
    

    【讨论】:

    • 是否有计划将记录过滤策略和容器错误处理程序添加为属性。
    • 您的问题未标记spring-boot;如果您的意思是作为引导自动配置使用的引导属性公开,那么如果上下文中有一个 ErrorHandler bean,则它被连接到容器工厂。过滤策略目前没有自动配置,你可以尝试打开一个新的功能来防止启动,但是很容易覆盖 Boot 的自动配置工厂@Bean 来设置额外的属性。如果您不是指 Spring Boot,那么我不确定您的意思。
    • 我编辑了答案以展示如何轻松添加过滤器。
    • 感谢加里。是的,我的意思是 Spring Boot 自动配置的 bean。我们的计划与您的建议相同。 (只是尝试使用属性和自动配置的 bean 进行管理,因为我们有许多具有不同属性配置文件的服务)。将用 spring-boot 标记
    • 我相信他们会考虑拉取请求;它与错误处理程序的注入方式应该没有太大区别。也就是说,对于具有多个侦听器的应用程序可能没有意义,因为每个侦听器可能需要不同的过滤器,因此每个侦听器都需要一个自定义容器工厂。也许更好的解决方案是要求我们(spring-kafka)直接向@KafkaListener注解添加过滤策略;再次,PR 将不胜感激 :)
    猜你喜欢
    • 2022-11-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-03-31
    • 2021-10-30
    • 1970-01-01
    • 2017-01-16
    • 1970-01-01
    相关资源
    最近更新 更多