【发布时间】:2021-11-15 04:11:11
【问题描述】:
我正在使用管理客户端 API 查询 kafka 代理,以使用以下代码获取 CONSUMER_GROUP 的提交偏移量:
Map<TopicPartition, OffsetAndMetadata> offsets =
admin.listConsumerGroupOffsets(CONSUMER_GROUP)
.partitionsToOffsetAndMetadata().get();
上面的代码将触发对一个特殊创建的 __consumer_offsets 主题的查询,以获取 CONSUMER_GROUP 负责的主题分区的每个分区的提交偏移量。
另一方面,我正在使用下面的代码来检索 CONSUMER_GROUP 的每个主题分区的最新(结束)偏移量
for(TopicPartition tp: offsets.keySet()) {
requestLatestOffsets.put(tp, OffsetSpec.latest());
}
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets =
admin.listOffsets(requestLatestOffsets).all().get();
for (Map.Entry<TopicPartition, OffsetAndMetadata> e: offsets.entrySet()) {
long latestOffset = latestOffsets.get(e.getKey()).offset();
我的问题是,提交的偏移量和最新的偏移量因此是从两个不同的主题中查询/请求的。提交的偏移量是从 __consumer_offsets 主题请求的,最新(结束)偏移量是从 CONSUMER_GROUP 的实际主题请求的。
(1) 上面关于请求已提交和最新偏移量的描述是否准确?
(2) 可以直接查询__consumer_offsets topic吗?
谢谢。
【问题讨论】:
-
2) “查询”是什么意思? You can consume that topic
标签: java apache-kafka kafka-consumer-api