【发布时间】:2019-05-19 11:29:41
【问题描述】:
有时可以根据标头值在反序列化之前过滤掉消息。使用spring kafka的这个场景是否有任何现有模式。我正在考虑实现类似于 ErrorHandlingDeserializer 除了委托将过滤谓词也作为属性。有什么建议?谢谢。
【问题讨论】:
有时可以根据标头值在反序列化之前过滤掉消息。使用spring kafka的这个场景是否有任何现有模式。我正在考虑实现类似于 ErrorHandlingDeserializer 除了委托将过滤谓词也作为属性。有什么建议?谢谢。
【问题讨论】:
是的,您可以使用 ErrorHandlingDeserializer 使用的相同技术来返回“标记”对象而不是进行反序列化,然后添加一个 RecordFilterStrategy 过滤具有此类对象的记录,到侦听器(容器工厂当使用@KafkaListener 或为显式侦听器使用过滤适配器时)。
编辑
Spring Boot 和添加过滤器...
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
kafkaConsumerFactory.setRecordFilterStrategy(myFilter());
return factory;
}
【讨论】:
ErrorHandler bean,则它被连接到容器工厂。过滤策略目前没有自动配置,你可以尝试打开一个新的功能来防止启动,但是很容易覆盖 Boot 的自动配置工厂@Bean 来设置额外的属性。如果您不是指 Spring Boot,那么我不确定您的意思。
@KafkaListener注解添加过滤策略;再次,PR 将不胜感激 :)