【问题标题】:Kafka Connect Offsets. Get/Set?卡夫卡连接偏移量。获取/设置?
【发布时间】:2016-10-09 14:23:10
【问题描述】:

如何获取、设置或重置 Kafka Connect 连接器/任务/接收器的偏移量?

我可以使用运行kafka.admin.ConsumerGroupCommand/usr/bin/kafka-consumer-groups 工具来查看我所有常规Kafka 消费者组的偏移量。但是,Kafka Connect 任务和组不使用此工具显示。

同样,我可以使用 zookeeper-shell 连接到 Zookeeper,我可以看到常规 Kafka 消费者组的 zookeeper 条目,但不能看到 Kafka Connect sinks。

【问题讨论】:

  • 作为一种(不好的)解决方法,您可以删除连接器并以不同的名称注册一个新连接器。显然,这仅在您不必定期执行此操作时才有意义。
  • This 很好地解释了如何修改组的偏移量。

标签: apache-kafka


【解决方案1】:

您不能设置偏移量,但可以使用kafka-consumer-groups.sh 工具“滚动”前馈。

您的连接器的消费者组的名称为connect-*CONNECTOR NAME*,但您可以仔细检查:

unset JMX_PORT; ./bin/kafka-consumer-groups.sh --bootstrap-server *KAFKA HOSTS* --list

查看当前偏移量:

unset JMX_PORT; ./bin/kafka-consumer-groups.sh --bootstrap-server *KAFKA HOSTS* --group connect-*CONNECTOR NAME* --describe

向前移动偏移量:

unset JMX_PORT; ./bin/kafka-console-consumer.sh --bootstrap-server *KAFKA HOSTS* --topic *TOPIC* --max-messages 10000 --consumer-property group.id=connect-*CONNECTOR NAME* > /dev/null

我想你也可以通过首先删除消费者组来向后移动偏移量,使用--delete 标志。

不要忘记通过 Kafka Connect REST API 暂停和恢复连接器。

【讨论】:

  • 即使在暂停连接器后,尝试删除连接使用者组也会导致此错误:“无法删除组,原因是:NON_EMPTY_GROUP”
  • @emirhosseini 感谢您的警告,但此答案的目的不是展示如何在所有可能的情况下删除消费者组。
  • 此答案适用于旧版本的 Kafka。从 2.2.0 开始,这不起作用
【解决方案2】:

在我的情况下(测试读取文件到生产者并在控制台中消费,全部都在本地),我只是在生产者输出中看到了这个:

offset.storage.file.filename=/tmp/connect.offsets

所以我想打开它,但它是二进制的,有一些难以识别的字符。

我删除了它(重命名它也可以),然后我可以写入同一个文件并再次从消费者那里获取文件内容。 您必须重新启动控制台生产者才能生效,因为它会尝试读取偏移量文件,如果不存在,则创建一个新的,以便重置偏移量。

如果你想在不删除的情况下重置它,你可以使用:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <group-name> --reset-offsets --to-earliest --topic <topic_name>

您可以通过以下方式检查所有组名:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

并检查每个组的详细信息:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <group_name> --describe

在生产环境中,这个偏移量是由 zookeeper 管理的,所以需要更多的步骤(和谨慎)。你可以参考这个页面:

https://metabroadcast.com/blog/resetting-kafka-offsets https://community.hortonworks.com/articles/81357/manually-resetting-offset-for-a-kafka-topic.html

步骤:

kafka-topics --list --zookeeper localhost:2181
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic vital_signs --time -1 // -1 for largest, -2 for smallest

set /consumers/{yourConsumerGroup}/offsets/{yourFancyTopic}/{partitionId} {newOffset}

【讨论】:

  • Kafka connect 不在 zookeeper 中存储偏移量。这是针对旧的(0.8)kafka 客户的。所以这并不能回答问题。
【解决方案3】:

从 0.10.0.0 开始,Connect 不提供用于管理偏移量的 API。这是我们希望在未来改进的东西,但还没有。 ConsumerGroupCommand 将是管理 Sink 连接器偏移的正确工具。请注意,源连接器偏移量存储在 Connect 的特殊偏移量主题中(它们与普通的 Kafka 偏移量不同,因为它们是由源系统定义的,请参阅worker configuration docs 中的offset.storage.topic)并且因为接收器连接器使用新的消费者,他们不会将他们的偏移存储在 Zookeeper 中——所有现代客户端都使用基于 Kafka 的原生偏移存储。 ConsumerGroupCommand 可以使用这些偏移量,您只需传递 --new-consumer 选项)。

【讨论】:

猜你喜欢
  • 2018-04-04
  • 2020-06-04
  • 1970-01-01
  • 2019-05-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多