【问题标题】:Kafka committed and last offsets using admin APIKafka 使用管理 API 提交和最后偏移量
【发布时间】:2021-11-15 04:11:11
【问题描述】:

我正在使用管理客户端 API 查询 kafka 代理,以使用以下代码获取 CONSUMER_GROUP 的提交偏移量:

Map<TopicPartition, OffsetAndMetadata> offsets =
                admin.listConsumerGroupOffsets(CONSUMER_GROUP)
                        .partitionsToOffsetAndMetadata().get();

上面的代码将触发对一个特殊创建的 __consumer_offsets 主题的查询,以获取 CONSUMER_GROUP 负责的主题分区的每个分区的提交偏移量。

另一方面,我正在使用下面的代码来检索 CONSUMER_GROUP 的每个主题分区的最新(结束)偏移量

for(TopicPartition tp: offsets.keySet()) {
requestLatestOffsets.put(tp, OffsetSpec.latest());
}

Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets =
admin.listOffsets(requestLatestOffsets).all().get();

for (Map.Entry<TopicPartition, OffsetAndMetadata> e: offsets.entrySet()) {
long latestOffset = latestOffsets.get(e.getKey()).offset();

我的问题是,提交的偏移量和最新的偏移量因此是从两个不同的主题中查询/请求的。提交的偏移量是从 __consumer_offsets 主题请求的,最新(结束)偏移量是从 CONSUMER_GROUP 的实际主题请求的。

(1) 上面关于请求已提交和最新偏移量的描述是否准确?

(2) 可以直接查询__consumer_offsets topic吗?

谢谢。

【问题讨论】:

标签: java apache-kafka kafka-consumer-api


【解决方案1】:
  1. 是的,您的理解是正确的。提交的偏移量存储在__consumer_offsets 主题中,而您需要查询特定分区以获取它们的结束偏移量。

  2. 是的__consumer_offsets是一个常规话题,你可以直接消费。通过提供的 API 检索数据通常更容易,但如果您对其内容感兴趣,可以使用它。如果您想了解如何反序列化数据,请查看console Formatters

【讨论】:

    猜你喜欢
    • 2020-09-06
    • 2017-03-17
    • 1970-01-01
    • 2014-08-08
    • 2018-07-01
    • 2017-07-17
    • 2020-12-28
    • 2021-11-14
    • 1970-01-01
    相关资源
    最近更新 更多