【发布时间】:2019-05-20 22:59:03
【问题描述】:
我正在开发一个 kafka 消费者 API,它使用来自某个主题的消息。当它使用不正确的消息(比如格式不正确的 JSON 消息)时,我希望应该调用我的错误处理程序来通知支持组对不正确的消息采取一些措施。
但我的错误处理程序不会自动调用。您能否告知我的代码中缺少的内容。
如果我将错误处理程序自动连接到我的侦听器类并显式调用,一切正常。
错误处理类
public class MyErrorHandler implements ErrorHandler, KafkaListenerErrorHandler {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException ex, Consumer<?, ?> consumer) {
.....
}
}
消费者配置
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaManualAckListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerGroupFactory());
//factory.getContainerProperties().setPollTimeout(3000);
//factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
//factory.getContainerProperties().setAckOnError(false);
factory.setErrorHandler(new MyErrorHandler());
return factory;
}
@Bean
public MyErrorHandler myErrorHandler() {
return new MyErrorHandler();
}
监听类
@KafkaListener(topics = "${kafka.proposal.topic.name}" , containerFactory = "kafkaManualAckListenerContainerFactory", errorHandler ="${kafka.custom.error.handler}")
public void listen(ConsumerRecord<?,?> cr) {
//Logic to get the message from topic and parse it to json, here i am testing with incorrect messages and producing JsonSyntaxException
}
注意:我的属性文件中的 kafka.custom.error.handler = myErrorHandler。
我希望我的错误处理程序应该被自动调用。但事实并非如此。我是否缺少任何配置。
【问题讨论】:
标签: error-handling apache-kafka kafka-consumer-api spring-kafka