【问题标题】:My kafka error handler is not getting invoked, should I invoke explicitly?我的 kafka 错误处理程序没有被调用,我应该显式调用吗?
【发布时间】:2019-05-20 22:59:03
【问题描述】:

我正在开发一个 kafka 消费者 API,它使用来自某个主题的消息。当它使用不正确的消息(比如格式不正确的 JSON 消息)时,我希望应该调用我的错误处理程序来通知支持组对不正确的消息采取一些措施。

但我的错误处理程序不会自动调用。您能否告知我的代码中缺少的内容。

如果我将错误处理程序自动连接到我的侦听器类并显式调用,一切正常。

错误处理类

public class MyErrorHandler implements ErrorHandler, KafkaListenerErrorHandler {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException ex, Consumer<?, ?> consumer) {
.....
   }
}

消费者配置

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaManualAckListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerGroupFactory());
    //factory.getContainerProperties().setPollTimeout(3000);
    //factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    //factory.getContainerProperties().setAckOnError(false);
    factory.setErrorHandler(new MyErrorHandler());
    return factory;
}

@Bean
public MyErrorHandler myErrorHandler() {
    return new MyErrorHandler();
}

监听类

@KafkaListener(topics = "${kafka.proposal.topic.name}" ,                containerFactory = "kafkaManualAckListenerContainerFactory",                errorHandler ="${kafka.custom.error.handler}")
public void listen(ConsumerRecord<?,?> cr)  {   
     //Logic to get the message from topic and parse it to json, here i am testing with incorrect messages and producing JsonSyntaxException
}

注意:我的属性文件中的 kafka.custom.error.handler = myErrorHandler。

我希望我的错误处理程序应该被自动调用。但事实并非如此。我是否缺少任何配置。

【问题讨论】:

    标签: error-handling apache-kafka kafka-consumer-api spring-kafka


    【解决方案1】:

    如果发生反序列化错误,例如在JsonDeserializer中,它发生在Spring获取消息之前,所以我们不能调用错误处理程序。

    从 2.2 版开始,您可以使用 ErrorHandlingDeserializer2 来包装反序列化程序。这允许框架获取失败的反序列化并将其传递给容器的错误处理程序。

    当反序列化器无法反序列化消息时,Spring 无法处理该问题,因为它发生在 poll() 返回之前。为了解决这个问题,2.2 版本引入了 ErrorHandlingDeserializer2。这个反序列化器委托给一个真正的反序列化器(键或值)。如果委托未能反序列化记录内容,则 ErrorHandlingDeserializer2 在包含原因和原始字节的标头中返回空值和 DeserializationException。当您使用记录级 MessageListener 时,如果 ConsumerRecord 包含键或值的 DeserializationException 标头,则使用失败的 ConsumerRecord 调用容器的 ErrorHandler。记录不会传递给监听器。

    【讨论】:

    • 谢谢 Gary,你能建议一个我的 errorHandler 将被自动调用的案例吗?假设我没有收到 JsonDeserializer 错误。
    • 如果您的监听器抛出异常,监听器适配器将调用KafkaListenerErrorHandler 方法。如果没有侦听器错误处理程序(或者它也抛出异常),则容器将调用ErrorHandler。容器不知道 KafkaListenerErrorHandler - 就它而言,它是侦听器的一部分。
    猜你喜欢
    • 2015-06-03
    • 2019-09-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-11-19
    • 2020-10-29
    • 1970-01-01
    • 2018-06-02
    相关资源
    最近更新 更多