【发布时间】:2019-11-28 08:42:29
【问题描述】:
我们有一段代码可以获取 kafka 主题消费者的一些详细信息。下面的代码显示了如何获取分区和相应的偏移量。我们需要的缺失信息是消费者组内分区的客户端 ID/消费者。我们有办法获取每个主题分区的消费者吗?
ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
ArrayList<OffsetAndMetadata> offsets = new ArrayList<OffsetAndMetadata>();
for (int i=0;i<consumer.partitionsFor(topic).size();i++)
{
TopicPartition partitiontemp = new TopicPartition(topic, i);
partitions.add(partitiontemp);
OffsetAndMetadata offsettemp = consumer.committed(partitiontemp);
offsets.add(offsettemp);
}
consumer.assign(partitions);
consumer.seekToEnd(partitions);
for (int i=0;i<consumer.partitionsFor(topic).size();i++)
{
try {
long cur_offset = offsets.get(partitions.get(i).partition()).offset();
long log_offset = consumer.position(partitions.get(partitions.get(i).partition()));
System.out.printf("Topic: %s partitionID: %d current offset: %d log offset: %d uncommitted: %d\n",
topic, partitions.get(i).partition(),cur_offset , log_offset , log_offset - cur_offset);
}catch (Exception ex){
System.out.printf("Topic: %s partitionID: %d current offset: - log offset: - uncommitted: -\n", topic, partitions.get(i).partition());
}
}
【问题讨论】:
标签: java apache-kafka kafka-consumer-api