【问题标题】:Code to find the lag in the consumer offset using kafka library?使用 kafka 库查找消费者偏移量中的滞后的代码?
【发布时间】:2017-02-07 22:46:56
【问题描述】:

我想获得 kafka 消费者的进度,即 Lag。我知道以下命令给了我延迟和其他有价值的描述。

bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --zookeeper localhost:2182 --describe --group DemoConsumer

bin/kafka-consumer-groups.sh --zookeeper localhost:2182 --describe --group DemoConsumer

我还可以在 kafka-client 的帮助下使用以下代码 sn-p 获取当前消费者偏移量

ConsumerRecords<Integer, String> records = consumer.poll(100);
for (ConsumerRecord<Integer, String> record : records) {

             System.out.println("Received message: (" + record.topic()+ ",
             " + record.partition()+ ", " + record.key() + ", " +
             record.value() + ") at offset " + record.offset());
}

但是我找不到代码来获取上述两个命令的详细信息。是否有任何代码可以使用 kafka 库查找滞后和其他详细信息?

【问题讨论】:

    标签: apache-kafka kafka-consumer-api


    【解决方案1】:

    根据this topic,可以得到消费者滞后。但是,该主题中的maven依赖是错误的,应该是

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>0.10.2.0</version>
    </dependency>
    

    代码是:

    AdminClient client = AdminClient.createSimplePlaintext("localhost:9092");
    Map<TopicPartition, Object> offsets = JavaConversions.asJavaMap(
    client.listGroupOffsets("groupID"));
    Long offset = (Long) offsets.get(new TopicPartition("topic", 0));
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-10-31
      • 1970-01-01
      • 1970-01-01
      • 2020-08-08
      • 1970-01-01
      • 2017-07-22
      相关资源
      最近更新 更多