【问题标题】:How to handle kafka consumer failures如何处理kafka消费者失败
【发布时间】:2020-10-29 13:39:48
【问题描述】:

我正在尝试了解如何处理失败的消费者记录。如何 我们知道有记录失败。我看到的是当记录 消费者处理失败,运行时异常消费者是 继续重试。但是当下一条记录可用于处理它时 正在提交最新记录的偏移量,这是预期的。我的 质疑我们如何知道失败的记录。在旧消息中 系统失败的消息回滚到队列并停止处理 那里。然后我们知道队列已关闭,我们可以采取行动。

我可以将失败的记录记录到某个数据库表中,但是如果记录失败会发生什么? 我可以将失败移动到错误/死信队列,如果移动失败又会发生什么?

我正在使用带有 spring boot 2.3.4 的 kafka 2.6。任何帮助将不胜感激

【问题讨论】:

    标签: apache-kafka


    【解决方案1】:

    听起来您需要禁用自动提交并在达到“成功处理”的范围时自己手动提交偏移量。如果您包含像数据库这样的外部进程,那么您还需要增加 Kafka 客户端超时,这样它就不会认为消费者在等待错误记录/处理时已经死亡。

    【讨论】:

    • 关闭自动提交的事件,它将在失败后处理下一条记录并提交下一个偏移量。自动提交关闭不会在失败后停止消费消息。我对吗?又是我的问题,我们如何知道失败的记录?
    • 如果禁用提交,则从轮询中丢弃剩余记录(提交成功记录偏移后),然后再次轮询将重试失败的记录
    • 感谢您的快速回复。那应该对我有用。我正在使用带有 kafka 监听器的 spring boot 2.3.4。我刚开始还在学习卡夫卡。您能否帮助如何从失败的记录中启动 kafka 侦听器。看起来我需要在失败后动态启动监听器。例如,如果一条记录上的数据库连接失败,那么我将停止从那里提交偏移量。当连接问题恢复后,我如何在不更改代码的情况下启动监听器。我需要管理客户端 API 来启动这样的 API?
    • 就像我说的,在下一次轮询中,它将从任何未提交的偏移量开始,在这种情况下,它将是最近失败的记录。我不使用 Spring,但我认为 KafkaListener 应该持续运行
    猜你喜欢
    • 2020-12-18
    • 1970-01-01
    • 2018-01-27
    • 1970-01-01
    • 2018-09-29
    • 2017-08-13
    • 1970-01-01
    • 2020-12-07
    • 1970-01-01
    相关资源
    最近更新 更多