【问题标题】:C++ kafka client (rdkafka)C++ kafka 客户端 (rdkafka)
【发布时间】:2019-10-28 18:18:10
【问题描述】:

我正在使用来自的 c++ kafka 实现 C++ rdkafka 。 问题是关于 RdKafka::KafkaConsumer。如何将消费者设置为从主题的开头开始?

附: 链接中的消费者示例基于 RdKafka::Consumer 标记为“仅限旧版,请改用 KafkaConsumer”

谢谢

【问题讨论】:

  • 你试过“conf->set("auto.offset.reset", "smallest", errstr)" 还是 "conf->set("auto.offset.reset", "earliest" , errstr)"(适用于较新的 Kafka 版本)
  • 是的。但这没有帮助。当我重新启动消费者时,我希望从头开始阅读所有消息(即使是我在上一个消费者应用程序崩溃之前提交的消息)
  • 您的问题解决了吗?如果是怎么办?
  • 这个有什么消息吗?
  • @GianLorenzoMeocci 请参阅下面的新答案。

标签: c++ apache-kafka kafka-consumer-api


【解决方案1】:

“auto.offset.reset”不是这样工作的。 “auto.offset.reset”只有在没有有效的提交偏移时才有效。流程是这样的:

  1. 启动消费者(重启或崩溃后)
  2. 寻找偏移量
    • 如果找到,从偏移恢复
    • 如果没有找到,根据auto.offset.reset设置偏移量。

如果您想在每次重新启动时阅读整个主题,实际上根本没有理由提交偏移量。提交偏移量的目的是为了知道你离开了哪里,因为你想在重启后从这个偏移量恢复。

【讨论】:

  • 好的,谢谢。实施客户后期加入的最佳实践是什么? (例如,我生成了一条配置消息,然后许多消费者应该在自己的时间和计划中阅读它)我想不提交会起作用,实际上我还认为使用 group.id = GUID 创建每个消费者会产生相同的行为,但是它没有。
  • 我不太明白。你到底想做什么?似乎是一个新的问题思想。为了保持清洁,您可能需要提出一个新问题。
  • @MatthiasJ.Sax 我也遇到了这种行为。我正在使用 auto.offset.reset= false 和一个新的随机生成的 group.id 没有消费者提交仍然没有帮助。消费者总是阅读最新消息。
  • auto.offset.reset 的值为“最早”或“最新”或“无”(参见 kafka.apache.org/documentation/#consumerconfigs)——对于 librdkafka,“无”是“错误”(参见 github.com/edenhill/librdkafka/blob/master/src/…
  • 哦,对不起,我的错误。我正在使用“最早”。仍然只阅读新消息。我发现的唯一解决方法是使用 rebalance_cb 并设置 assignment[0].offset = 0;但是如果我有超过 1 个分区怎么办?我应该将它们全部设置为 0 吗?
【解决方案2】:

我为此浪费了几个小时,答案是,通常情况下,RTFM :) 是的,您需要将属性 auto.offset.reset 的值设置为 smallest,但关键问题是——在哪里?

来自this link

高级 Kafka 消费者(C++ 中的 KafkaConsumer)将启动 默认在最后提交的偏移处消费,如果没有 先前提交的主题+分区和组的偏移量,它将 依靠 topic 配置属性 auto.offset.reset 默认为最新,因此在结束时开始消费 分区(只会消费新消息)。

注意粗体字,我做错的是调用这个:

rd_kafka_conf_set(_conf_handle, key, val, _errstr, sizeof(_errstr));

而不是这个:

rd_kafka_topic_conf_set(_topic_conf_handle, key, val, _errstr, sizeof(_errstr));

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-08-20
    • 2017-04-03
    • 1970-01-01
    • 2016-12-18
    • 2017-05-13
    • 2017-08-04
    • 1970-01-01
    • 2020-05-19
    相关资源
    最近更新 更多