【问题标题】:Get the consumer or client id assigned to Kafka partitions获取分配给 Kafka 分区的消费者或客户端 ID
【发布时间】:2019-11-28 08:42:29
【问题描述】:

我们有一段代码可以获取 kafka 主题消费者的一些详细信息。下面的代码显示了如何获取分区和相应的偏移量。我们需要的缺失信息是消费者组内分区的客户端 ID/消费者。我们有办法获取每个主题分区的消费者吗?

    ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
    ArrayList<OffsetAndMetadata> offsets = new ArrayList<OffsetAndMetadata>();

    for (int i=0;i<consumer.partitionsFor(topic).size();i++)
    {
        TopicPartition partitiontemp = new TopicPartition(topic, i);
        partitions.add(partitiontemp);
        OffsetAndMetadata offsettemp = consumer.committed(partitiontemp);
        offsets.add(offsettemp);
    }

    consumer.assign(partitions);
    consumer.seekToEnd(partitions);

    for (int i=0;i<consumer.partitionsFor(topic).size();i++)
    {
        try {

            long cur_offset = offsets.get(partitions.get(i).partition()).offset();
            long log_offset = consumer.position(partitions.get(partitions.get(i).partition()));

            System.out.printf("Topic: %s partitionID: %d current offset: %d log offset: %d uncommitted: %d\n",
                    topic, partitions.get(i).partition(),cur_offset , log_offset , log_offset - cur_offset);


        }catch (Exception ex){
            System.out.printf("Topic: %s partitionID: %d current offset: - log offset: - uncommitted: -\n", topic, partitions.get(i).partition());

        }
    }

【问题讨论】:

标签: java apache-kafka kafka-consumer-api


【解决方案1】:

您可能是指消费者的group.id 属性,因为偏移量是每个组的,也不是每个client.id 的。本次讨论stackoverflow.com/questions/55937806/apache-kafka-get-list-of-consumers-on-a-specific-topic/55938325

可以回答你的问题。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-11-10
    • 1970-01-01
    • 2022-06-14
    • 2018-07-13
    • 1970-01-01
    • 2017-01-04
    相关资源
    最近更新 更多