【发布时间】:2019-05-14 20:10:06
【问题描述】:
我正在使用 Kafka 进行一些 POC 工作,并且正在使用事务 API 研究恰好一次的功能,但仍然遇到一些问题。如果消费者在处理消息后但在提交其偏移量之前崩溃会发生什么?似乎下一次运行不可避免地会从错误的消息开始,并且会出现重复的消息。我该如何处理这种情况?
【问题讨论】:
标签: apache-kafka kafka-consumer-api
我正在使用 Kafka 进行一些 POC 工作,并且正在使用事务 API 研究恰好一次的功能,但仍然遇到一些问题。如果消费者在处理消息后但在提交其偏移量之前崩溃会发生什么?似乎下一次运行不可避免地会从错误的消息开始,并且会出现重复的消息。我该如何处理这种情况?
【问题讨论】:
标签: apache-kafka kafka-consumer-api
如果消费者在处理消息后但在提交其偏移量之前崩溃会发生什么?
Kafka: Definitive Guide 提到了一个选项,如果消息处理涉及将消息写入数据库,我们可以将处理后的偏移量也写入数据库,并在恢复阶段通过寻找 (seek()) 偏移量来使用该偏移量我们想从 Kafka 中进行轮询。
也就是说,您可以在每条消息中拥有一个唯一标识符,以便消费者可以检查该消息之前是否已被处理过。 此(重复处理)应始终在消费者代码中实现。
【讨论】: