【问题标题】:How to get latest offset/size of a Kafka topic using KafkaAdminClient (Java) for 2.x version如何使用 2.x 版本的 KafkaAdminClient (Java) 获取 Kafka 主题的最新偏移量/大小
【发布时间】:2020-05-20 21:14:40
【问题描述】:

是否有更有效/更简单的方法来使用最新的 Java 中的 Kafka 客户端 2.4 API 获取主题/分区的大小/最新偏移量? 然后,通过将该组的偏移量与主题的大小进行比较来计算该消费者组的 Lag...

我知道这个问题已经被问到较旧的 Kafka 版本,还有一种方法可以从 Kafka 公开的 JMX 指标中获取这些信息,但是我被一个需要在 Java 中执行但最新 2.4 的遗留应用程序困住了Kafka 库。

据我了解,获取此信息的常用方法是:

  • 最简单的部分:使用 KafkaAdminClient 上的 API 调用获取消费者 groupID 的主题/分区的偏移量,例如 public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options)
  • 最难的部分:确定每个分区的主题大小:
    • 创建一个新的消费者并订阅主题
    • 使用consumer.seekToEnd(...) 将消费者提前到最新的偏移量
    • 使用consumer.position(...)获取所有分区的消费者位置
  • 最后,做 [size - current offset] 来确定每个分区的消费者组的滞后

因此,确定最后一个偏移量是一项相当繁重的操作...... 所以我的问题是:是否有一种更有效的方法可以在不使用虚拟消费者的情况下获取主题的最后偏移量,也许在最新的 2.4 API 中?主题/分区大小信息确实独立于任何消费者,因此在不使用消费者的情况下能够获得它似乎是合乎逻辑的......

谢谢!

码头

【问题讨论】:

  • 你仍然需要使用消费者,但你可以使用endOffsets 方法,如果它更容易的话。在 2.5(2 月底发布)中,您可以在管理客户端上使用方法 listOffsets 来获取结束偏移量。
  • 这是关于 2.5 的好消息! @user2683814 - 如果您发布您的评论作为答案 - 我会很乐意接受它:)

标签: java apache-kafka


【解决方案1】:

对于使用 kafka 的应用程序,您是正确的,您的选择是查看分区结束偏移量与消费者组的最新检查点位置(假设有问题的消费者甚至使用 kafka 来存储偏移量)。

有一些工具可以为您监控,例如burrow

但是,如果您可以访问消费应用程序本身,那么还有一种更准确的方法。这是所有消费者传感器的列表(默认通过 API 或 jmx 公开)https://kafka.apache.org/documentation/#consumer_fetch_monitoring

每个分区都有一个 records-lag 指标。每次调用 poll() 时都会更新它,因此比提交的偏移量更准确且延迟更低。唯一的复杂之处是您需要对分配给消费者的所有分区中的这些传感器的值求和。

这是通过KafkaConsumer.metrics()获取它的方法:

private long calcTotalLag(Map<MetricName, ? extends Metric> metrics) {
   long totalLag = 0;
   for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
     MetricName metricName = entry.getKey();
     Metric metric = entry.getValue();
     Map<String, String> tags = metricName.tags();
     if (metricName.name().equals("records-lag") && tags.containsKey("partition")) {
        totalLag += ((Number) metric.metricValue()).longValue();
     }
   }

   return totalLag;
}

【讨论】:

  • 这很有趣!我可以使用哪个 API 来获取此信息?我浏览了 KafkaConsumer API,但没有看到 Lag 的任何内容。不幸的是,由于开放端口的限制,我目前无法使用 JMX 指标...
  • @Marina - 通过 metrics() 映射。我已经更新了我的答案以包含执行此操作的代码。
  • 谢谢!不幸的是,我需要从与实际消费者分开的服务中进行这种监控,但我会记住这个选项,以防我可以重新设计该方法。
猜你喜欢
  • 2016-05-27
  • 2017-12-12
  • 2020-08-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-11-20
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多