【问题标题】:How can a kafka consumer doing infinite retires recover from a bad incoming message?无限退休的 kafka 消费者如何从糟糕的传入消息中恢复过来?
【发布时间】:2016-08-25 11:01:32
【问题描述】:

我是 kafka 新手,在阅读文档时,我遇到了这个与 kafka 消费者相关的设计相关问题。

kafka 消费者从组成的 kafka 流中读取消息 来自一台或多台服务器的一个或多个分区。

假设其中一条传入消息已损坏,因此消费者无法处理。但是在处理事件日志时,您不想丢弃任何事件,因此您会进行无限重试以避免处理过程中出现瞬时错误。在这种无限重试的情况下,消费者如何前进。有没有办法将此消息列入黑名单以便下次重试?

我认为它需要人工干预。我们在哪里记录一些消息元数据(还不知道到底是什么)以查看哪条消息失败了,并且每个消费者在 n 次重发后检查 redis(或其他地方?)以查看是否需要跳过该消息.黑名单也不必永远存储在redis中,直到消费者可以跳过它。这是我刚才描述的伪代码:

while (errorState) {
       if (msg in blacklist) {
           //skip
           commitOffset()
       } else {      
            errorState = processMessage(msg);       
            if (!errorState) {
                 commitOffset();
            } else {
                 // log this msg so that we can add to blacklist
                 logger.info(msg)
            }
        }
}

我想听听更有经验的人的意见,看看是否有更好的方法来做到这一点。

【问题讨论】:

  • 能否提供您的消费者代码?不太清楚infinite retries 是什么意思。
  • @DavidGriffin 表示您重试,直到您能够处理它。用伪代码更新了描述

标签: apache-kafka kafka-consumer-api


【解决方案1】:

我们的项目中有一个需求,即处理传入消息以更新记录取决于存在的记录。由于某些竞争条件,有时更新在插入之前到达。在这种情况下,我们实施了几种方法。

A.以预定义的延迟手动重试。代码检查插入是否已到达。如果是这样,处理将照常进行。否则,它将休眠 500 毫秒,然后重试。这将重复 10 次。最后,如果消息仍未处理,代码会记录消息,提交偏移量并继续前进。消息的处理总是在池中的线​​程中完成,因此它也不会阻塞主线程。但是,在最坏的情况下,每条消息都需要 5 秒的应用时间。

B.最近,我们对上述方案进行了细化,使用了基于 kafka 的消息调度器。因此,现在如果在更新之前插入尚未到达,系统会将其发送到在 kafka 上运行的单独调度程序。此调度程序将在一段时间后重播消息。重试 3 次后,我们再次记录消息并停止调度或重试。这给我们带来了不阻塞应用程序线程并管理我们何时想要再次重播消息的好处。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-06-19
    • 1970-01-01
    • 2022-08-16
    • 2019-07-01
    • 1970-01-01
    • 2018-10-13
    • 1970-01-01
    • 2017-09-23
    相关资源
    最近更新 更多