【问题标题】:Kafka Storm Spout: Got fetch request with offset out of rangeKafka Storm Spout:获取偏移量超出范围的获取请求
【发布时间】:2017-03-09 03:05:50
【问题描述】:

我们的 Storm 拓扑中有一个场景,其中 KafkaSpouts 无法使用来自主题的任何消息。 Spout 不断记录相同的 WARN 消息:

获取偏移量超出范围的获取请求

...
2016-10-26 11:11:31.070 o.a.s.k.KafkaUtils [WARN] Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch request with offset out of range: [3]
2016-10-26 11:11:31.078 o.a.s.k.KafkaUtils [WARN] Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch request with offset out of range: [3]
2016-10-26 11:11:31.084 o.a.s.k.KafkaUtils [WARN] Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch request with offset out of range: [3]
2016-10-26 11:11:31.098 o.a.s.k.KafkaUtils [WARN] Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch request with offset out of range: [3]
2016-10-26 11:11:31.104 o.a.s.k.KafkaUtils [WARN] Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch request with offset out of range: [3]
2016-10-26 11:11:31.111 o.a.s.k.KafkaUtils [WARN] Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch request with offset out of range: [3]
...

Spout 被配置为从 zookeeper 读取最后一次提交的偏移量,并且在这种情况下该偏移量大于 Kafka 中最新的消息偏移量。我们也在研究为什么主题偏移会重置。

目前我们通过观察风暴日志中的超出范围警告来解决问题,删除zookeeper偏移条目,然后重新部署拓扑。

【问题讨论】:

  • 您最近是否将您的 spout 指向了一个新的主题名称?您是否有多个具有相同 ID 的 spout?
  • 该问题主要发生在开发环境中,因此我可以看到提交给 zookeeper 的“误导性”偏移量。我希望将 KafkSpout 配置为放弃获取超出范围偏移量的无限循环,并返回到最早或最新的偏移量。

标签: apache-kafka apache-storm apache-zookeeper


【解决方案1】:

就我而言,这是因为我重新创建了 KafkaSpout 订阅的 Kafka 主题。

特定分区的偏移量保存在 Zookeeper 中,如果一个主题被删除然后再次创建,您将不得不手动从 Zookeeper 中删除偏移量信息。

只需打开 Zookeeper CLI,然后删除属于 KafkaSpout 的使用者“组 ID”的“节点”所在的路径。 如需帮助,请参考https://www.tutorialspoint.com/zookeeper/zookeeper_cli.htm

【讨论】:

  • 这是我们在观察场景时所做的工作。删除zookeeper kafka条目,重新部署拓扑。
【解决方案2】:

如果提交了无效的偏移量,则使用客户端配置“auto.offset.reset”。它接受值“最小”和“最大”。如果未设置该值,则会引发异常(如您的情况)。

对于KafkaSpout,您可以通过变量KafkaConfig#startOffsetTime 设置此值,将其设置为kafka.api.OffsetRequest.EarliestTime()kafka.api.OffsetRequest.LatestTime()

http://storm.apache.org/releases/1.0.2/storm-kafka.html

【讨论】:

  • 我们的 Kafka 配置用于 Storm KafkaSpout,我在 Kafka/SpoutConfig 中没有看到类似于“auto.offset.reset”的属性。
  • 更新了我的答案。
  • Latest/EarliestTime() 的行为没有变化。逐步浏览 PartitionManager 和 KafkaUtils 我可以看到返回的最新或最早的偏移量,但 PartitionManger 忽略了它们而支持 zookeeper 提交的值。
  • Kafka消费者客户端的标准行为是使用Latest/Earliest,以防提交的偏移量无效。如果偏移量有效,当然应该使用它,并且最新/最早设置无效。所以客户端总是从它离开的地方恢复(只要提交的偏移量仍然有效)。如果您有一个已提交的偏移量并且想要重置到特定时间点,则需要手动执行此操作,而 KafkaSpout 根本不支持此操作——您需要在外部执行此操作。也许这个问题有帮助:stackoverflow.com/questions/40271847/…
  • 也许 Storm 的 KafkaSpout 作为 Kafka 消费者客户端的实现很糟糕。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-04-03
  • 2016-04-17
  • 2020-09-12
  • 2016-08-30
  • 2013-12-26
  • 1970-01-01
相关资源
最近更新 更多