【问题标题】:How to handler errors/exceptions while using Spring Kafka framework?使用 Spring Kafka 框架时如何处理错误/异常?
【发布时间】:2020-09-04 22:42:54
【问题描述】:

我无法找到如何为 spring kafka 消费者进行自定义错误处理。

我的要求是:

  1. 对于任何反序列化错误,只需将错误和消息写入数据库即可。
  2. 对于@KafkaListener方法下执行的任何错误,重试3次,然后将错误和消息写入数据库。

从 spring 文档中,我发现, 对于 1,我将不得不使用 ErrorHandlingDeserializer,然后它将调用 @KafkaListener 错误处理程序。 对于 2,框架提供了处理消息重试的SeekToCurrentErrorHandler

我不明白除了启用配置的重试之外,我还可以在哪里添加将异常/消息写入数据库的代码。

【问题讨论】:

    标签: java error-handling apache-kafka spring-kafka


    【解决方案1】:

    SeekToCurrentErrorHandler添加恢复器

    new SeekToCurrentErrorHandler((rec, ex) -> {
        Throwable cause = ex.getCause();
        if (cause instanceof DeserializationException) {
            ...
        }
        else {
            ...
        }, new FixedBackOff(2000L, 2L));
    

    默认不重试反序列化异常;大多数其他人在调用恢复器之前都会重试。

    【讨论】:

    • 这里我没有得到 DeserializationException,它被包裹在 ListenerExecutionFailedException 中。我必须先将异常转换为 ListenerExecutionFailedException,然后检查它是否包含 DeserializationException 或其他类型。
    • 对不起,是的;但是您实际上不必投射它;所有Throwables 都有cause()
    猜你喜欢
    • 2021-04-12
    • 1970-01-01
    • 2020-05-27
    • 1970-01-01
    • 1970-01-01
    • 2011-10-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多