【发布时间】:2016-08-25 11:01:32
【问题描述】:
我是 kafka 新手,在阅读文档时,我遇到了这个与 kafka 消费者相关的设计相关问题。
kafka 消费者从组成的 kafka 流中读取消息 来自一台或多台服务器的一个或多个分区。
假设其中一条传入消息已损坏,因此消费者无法处理。但是在处理事件日志时,您不想丢弃任何事件,因此您会进行无限重试以避免处理过程中出现瞬时错误。在这种无限重试的情况下,消费者如何前进。有没有办法将此消息列入黑名单以便下次重试?
我认为它需要人工干预。我们在哪里记录一些消息元数据(还不知道到底是什么)以查看哪条消息失败了,并且每个消费者在 n 次重发后检查 redis(或其他地方?)以查看是否需要跳过该消息.黑名单也不必永远存储在redis中,直到消费者可以跳过它。这是我刚才描述的伪代码:
while (errorState) {
if (msg in blacklist) {
//skip
commitOffset()
} else {
errorState = processMessage(msg);
if (!errorState) {
commitOffset();
} else {
// log this msg so that we can add to blacklist
logger.info(msg)
}
}
}
我想听听更有经验的人的意见,看看是否有更好的方法来做到这一点。
【问题讨论】:
-
能否提供您的消费者代码?不太清楚
infinite retries是什么意思。 -
@DavidGriffin 表示您重试,直到您能够处理它。用伪代码更新了描述
标签: apache-kafka kafka-consumer-api