【问题标题】:how to get kafka lag using java如何使用java获得kafka滞后
【发布时间】:2019-03-24 02:14:47
【问题描述】:

我目前开发了一个代码,可以显示主题、分区和日志偏移量。但我目前坚持如何获得分区的滞后。我知道有一个 kafka offset 命令可以执行此功能,但我需要的是一个 java 代码。

public static void main(String[] args) throws Exception {
    System.out.println("START CONSUMER");final Properties props = new Properties();
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUPID);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

    // Create the consumer using props.
    final Consumer<Long, String> consumer =  new KafkaConsumer<>(props);

    // Subscribe to the topic.
    int i = 0;
    ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
    for (i=0;i<consumer.partitionsFor(TOPIC).size();i++)
    {
        TopicPartition partitiontemp = new TopicPartition(TOPIC, i);
        partitions.add(partitiontemp);
    }
    consumer.assign(partitions);
    consumer.seekToEnd(partitions);

    for (i=0;i<consumer.partitionsFor(TOPIC).size();i++)
    {
        System.out.printf("Topic: %s partitionID: %d log offset: %d \n", TOPIC, i, consumer.position(partitions.get(i)));
    }

    System.out.printf("CREATE CONSUMER DONE");
    consumer.close();

我需要做的是输出主题、分区、当前偏移量、日志偏移量和滞后。如何获得我的代码的滞后或如何获得我的代码的当前偏移量。 (有关所需输出,请参见图片)。

注意:我不能使用(foreach 记录)功能,因为我不能读取输入文件中的每条记录。

【问题讨论】:

  • 为什么需要的输出标签不同?
  • Kafka offset命令也运行Java(其实是Scala),源码在Github上,如果你只是想复现的话

标签: java apache-kafka kafka-consumer-api


【解决方案1】:

要重现 kafka-consumer-groups 功能,您需要一个 Consumer 和一个 AdminClient 实例。

首先,使用 AdminClient,您可以调用 listConsumerGroupOffsets() 来检索特定组的主题分区列表和提交的偏移量。

然后使用 Consumer 获取这些分区的 End 偏移量。您使用的方法效率低下,无需分配和寻找结束偏移量。您可以直接拨打endOffsets()

这足以重现屏幕截图中包含的数据。

kafka-consumer-groups 还使用AdminClient.describeConsumerGroups() 打印分配给每个分区的组成员(如果有)。

【讨论】:

  • consumer.endOffsets 在请求结束偏移量时是否与来自不同组的其他消费者竞争?要求最新的偏移量与在幕后消费消息相同吗?它会导致重新平衡吗?另请参阅您是否有兴趣回答问题here。感谢回复。谢谢!
  • endOffsets() 不需要消费者组,因此无需重新平衡。它也不涉及消费任何消息。这就是为什么它比您发布的逻辑更有效的原因。
  • 谢谢米凯尔。 listConsumerGroupOffsets 呢?我有非常繁忙的性能敏感主题,有 10 个分区,有 10 个以上的消费者,每个消费者都在消费来自应用程序 A 的消息。当我从应用程序 B 发出 listConsumerGroupOffsets 请求时,它们的处理是否会受到影响?如果有的话,我在收集偏移量指标时应该注意哪些其他事情?谢谢!
  • listConsumerGroupOffsets() 只是发送一个OffsetFetch 请求。它不会影响现有组,并且其性能成本微不足道。
【解决方案2】:

您可以通过从消费者那里获取 EndOffset 来获得 LAG

Set<TopicPartition> partitionSet = consumer.assignment();
Map<TopicPartition, Long> endOffsets =consumer.endOffsets(consumer.assignment());

然后迭代where over set

for(TopicPartition tp : partitionSet) { LOG.info("Topic :: {} ,EndOffset :: {}, currentOffset {}",tp.topic(),beginningOffsets.get(tp),endOffsets.get(tp), consumer.position(tp)); }

consumer.position(tp) -- 会得到你当前的偏移量,从 endoffset 中减去这个,你会得到 LAG

【讨论】:

    猜你喜欢
    • 2019-01-13
    • 2018-09-24
    • 1970-01-01
    • 2017-07-01
    • 2021-03-09
    • 1970-01-01
    • 2019-09-27
    • 2021-12-03
    • 1970-01-01
    相关资源
    最近更新 更多