【问题标题】:Kafka consumer is processing all messages at startupKafka 消费者在启动时处理所有消息
【发布时间】:2021-10-11 14:56:36
【问题描述】:

我是 Kafka 的新手,正在开发一个包含一些服务的个人项目,它们之间的通信是通过 Kafka 进行的,我正在使用 Confluent 远程托管 Kafka。

一切正常,但是当我启动服务器时,它会尝试处理在我测试系统时生成的主题中的所有旧消息。

我想避免这种情况,因为这很耗时,并且在服务器上次启动时已经处理了这些消息。在开发环境中有什么方法可以防止这种情况发生吗?

我是否正确使用了 Kafka?有没有我错过的好习惯?

【问题讨论】:

    标签: apache-kafka


    【解决方案1】:

    “服务器”是指消费者。 broker 服务器不处理数据,只存储它。

    如果您有 auto.offset.reset=earliest + enable.auto.commit=false + 没有提交代码中的记录(或者每次都使用新的 group.id),这是预期的行为,因为您的 group.id 尚未跟踪消耗的数据。

    由于您现在处于已处理数据但未存储偏移量的情况,请先设置静态组 ID,然后您的选项包括

    • 再次重新处理所有数据,接受重复数据,也许在您的消费者代码中添加一些条件过滤器以跳过记录
    • 跳过所有已处理和未处理的数据,仅在消费者开始后才开始消费全新的记录,方法是设置新的group.id + auto.offset.reset=latest,或使用consumer.seekToEnd()/kafka-consumer-groups CLI 工具;设置auto.offset.reset=latest的缺点是你可能会遇到consumer group空闲太久,group过期了,导致你回到话题的最后,虽然可能还有未处理的数据
    • 手动查找最后处理数据的所有分区的偏移量和consumer.seek() 到这些偏移量

    【讨论】:

    • 是的,实际上我每次都在更改组 ID。最好的做法是什么,更改组 ID + auto.offset.reset=latest,还是保持不变?
    • 每次使用新的都会污染集群元数据。如果您在消费者中禁用/不自动提交,则不需要这样做。如果您确实提交了消息,那么只有在您保持相同的情况下,才会在重新启动之间跟踪进度,这听起来就像您想要的那样。或者如果你总是想从话题末尾开始,可以在轮询之前调用消费者的 seekToEnd 方法
    猜你喜欢
    • 2016-04-13
    • 2020-12-18
    • 1970-01-01
    • 2020-11-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-02-09
    相关资源
    最近更新 更多