【发布时间】:2019-03-22 16:59:15
【问题描述】:
假设我有一个无限期运行的计时器任务,它遍历 kafka 集群中的所有消费者组,并为每个组的所有分区输出延迟、提交的偏移量和结束偏移量。与 Kafka 控制台消费者组脚本的工作方式类似,但它适用于所有组。
类似
单一消费者 - 不工作 - 不返回某些提供的主题分区的偏移量(例如提供 10 个 - 返回 5 个偏移量)
Consumer consumer;
static {
consumer = createConsumer();
}
run() {
List<String> groupIds = getConsumerGroups();
for(String groupId: groupIds) {
List<TopicParition> topicParitions = getTopicParitions(groupId);
consumer.endOffsets(topicParitions); -- Not working - missing offsets for some partitions for some groups (in 10 - out 5)
}
}
多个消费者 - 工作
run() {
List<String> groupIds = getConsumerGroups();
for(String groupId: groupIds) {
List<TopicParition> topicParitions = getTopicParitions(groupId);
Consumer consumer = createConsumer();
consumer.endOffsets(topicParitions); This works!!!
}
}
版本:Kafka-Client 2.0.0
我是否错误地使用了消费者 API?理想情况下,我想使用单一消费者。
如果您需要更多详细信息,请告诉我。
【问题讨论】:
标签: java apache-kafka kafka-consumer-api spring-kafka