【问题标题】:How can we pause Kafka consumer polling/processing records when there is an exception because of downstream system当下游系统出现异常时,我们如何暂停 Kafka 消费者轮询/处理记录
【发布时间】:2020-06-19 05:09:18
【问题描述】:

我正在使用 spring boot 2.1.7.RELEASE 和 spring-kafka 2.2.8.RELEASE。并且我正在使用 @KafkaListener 注释来创建消费者,并且我正在使用消费者的所有默认设置。

现在,在我的消费者中,处理逻辑包括一个数据库调用,如果在处理过程中出现错误/异常,我会将记录发送到 DLT。

使用此设置,如果数据库由于某种原因关闭了几分钟,我想暂停/停止我的消费者消费更多记录,否则它会继续消费消息并会得到数据库异常并最终填满我的除非数据库回来(基于一些健康检查),否则我不想做 DLT。

现在我有几个问题。

  1. spring-kafka 是否提供基于异常类型触发无限重试的选项(在本例中为 DB 异常,但我想根据我的消费者逻辑添加更多类型的异常)
    1. spring-kafka 是否提供了基于条件触发消息消费的选项?

【问题讨论】:

    标签: spring-kafka


    【解决方案1】:

    有一个ContainerStoppingErrorHandler,但它会为所有异常停止容器。

    您需要创建一个自定义错误处理程序,在特定故障后停止(或暂停)容器,以及一些重新启动(或恢复)容器的机制。

    【讨论】:

    • 恢复可能是棘手的部分 - 可以停止/暂停以响应处理记录时发生的问题,但一旦停止或暂停,您就没有任何东西会触发检查是否问题已解决,因此您可能需要一个计划任务来检查是否可以恢复。
    • SeekToCurrentExceptionHandler 是否有可能实现无限重试的想法?
    • 不要在 cmets 中就其他答案提出新问题;总是问一个新问题(如果需要,请参考旧答案)。暂停容器时,您可以设置idleEventInterval 容器属性,并使用ApplicationListener bean 或@EventListener 方法来监听空闲容器事件;然后,您可以检查在触发事件时是否可以恢复容器。 STCEH 中的默认 FixedBackOff 将无限重试,延迟 5 秒。
    猜你喜欢
    • 1970-01-01
    • 2017-09-22
    • 2022-10-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-06-20
    • 1970-01-01
    相关资源
    最近更新 更多