【问题标题】:Java, How to get number of messages in a topic in apache kafkaJava,如何在 apache kafka 中获取主题中的消息数
【发布时间】:2015-04-19 05:42:23
【问题描述】:

我正在使用 apache kafka 进行消息传递。我已经用 Java 实现了生产者和消费者。我们如何获取主题中的消息数?

【问题讨论】:

标签: java messages apache-kafka


【解决方案1】:

它不是java,但可能有用

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list <broker>:<port> \
  --topic <topic-name> \
  | awk -F  ":" '{sum += $3} END {print sum}'

【讨论】:

  • 这不应该是每个分区总和的最早和最新偏移量的差异吗? bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -1 | awk -F ":" '{sum += $3} END {print sum}' 13818663 bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -2 | awk -F ":" '{sum += $3} END {print sum}' 12434609 然后差异返回主题中的实际待处理消息?我说的对吗?
  • 是的,没错。如果最早的偏移量不等于零,则必须计算差异。
  • 我就是这么想的 :)。
  • 有什么方法可以在代码(JAVA、Scala 或 Python)中将其用作 API 等?
  • 简化@kisna 对确切记录数的回答:brokers="" topic= sum_2=$(/usr/hdp/current/kafka-broker/bin/ kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time -2 | grep -e ':[[:digit:]]*:' | awk -F " :" '{sum += $3} END {print sum}') echo "主题${topic}中的记录数:"$((sum_1 - sum_2))
【解决方案2】:

我有同样的问题,这就是我的做法,来自 Kotlin 中的 KafkaConsumer:

val messageCount = consumer.listTopics().entries.filter { it.key == topicName }
    .map {
        it.value.map { topicInfo -> TopicPartition(topicInfo.topic(), topicInfo.partition()) }
    }.map { consumer.endOffsets(it).values.sum() - consumer.beginningOffsets(it).values.sum()}
    .first()

非常粗略的代码,因为我刚刚开始工作,但基本上你想从结束偏移中减去主题的开始偏移,这将是主题的当前消息计数。

您不能仅仅依赖结束偏移量,因为其他配置(清理策略、保留毫秒等)可能最终导致从您的主题中删除旧消息。 偏移量只会向前“移动”,因此它是开始偏移量将向前移动更接近结束偏移量(或者最终移动到相同的值,如果主题现在不包含消息)。

基本上,结束偏移量表示通过该主题的消息总数,两者之间的差异表示该主题现在包含的消息数。

【讨论】:

    【解决方案3】:

    运行以下命令(假设kafka-console-consumer.sh 在路径上):

    kafka-console-consumer.sh  --from-beginning \
    --bootstrap-server yourbroker:9092 --property print.key=true  \
    --property print.value=false --property print.partition \
    --topic yourtopic --timeout-ms 5000 | tail -n 10|grep "Processed a total of"
    

    【讨论】:

    • 注意:我删除了--new-consumer,因为该选项不再可用(或显然是必需的)
    【解决方案4】:

    由于不再支持ConsumerOffsetChecker,您可以使用此命令检查主题中的所有消息:

    bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
        --group my-group \
        --bootstrap-server localhost:9092 \
        --describe
    

    其中LAG 是主题分区中的消息数:

    您也可以尝试使用kafkacat。这是一个开源项目,可以帮助您从主题和分区中读取消息并将它们打印到标准输出。下面是一个示例,它从sample-kafka-topic 主题读取最后 10 条消息,然后退出:

    kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e
    

    【讨论】:

    • 这个答案不够精确。 LAG 是消费者等待消费的消息数量。不是分区中的消息总数。对分区中的消息总数(但仍然有些误导)更准确一点的值是 LOG-END-OFFSET。
    【解决方案5】:

    我发现最简单的方法是使用 Kafdrop REST API /topic/topicName 并指定键:"Accept" / 值:"application/json" 标头,以获取 JSON 响应。

    This is documented here.

    【讨论】:

      【解决方案6】:

      如果您可以访问服务器的 JMX 接口,则开始和结束偏移量存在于:

      kafka.log:type=Log,name=LogStartOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
      kafka.log:type=Log,name=LogEndOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
      

      (你需要替换TOPICNAME & PARTITIONNUMBER)。 请记住,您需要检查给定分区的每个副本,或者您需要找出哪个代理是 given 分区的领导者(这可能会随着时间而改变)。

      或者,您可以使用Kafka Consumer 方法beginningOffsetsendOffsets

      【讨论】:

      • 让我看看我是否正确:启用 JMX。获取所有指标。选择一个主题和一个分区。对于该主题/分区组合,获取 LogEndOffset 和 LogStartOffset。做差异。这是队列中的消息数。对吗?
      • 如果一个主题有多个分区,那么我需要为每个分区分别做这个数学吗?然后添加结果? (我是Kafka新手,之前只用过RabbitMQ。)
      【解决方案7】:

      Kafka 文档节选

      0.9.0.0 中的弃用

      kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) 已被弃用。今后,请使用 kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) 来实现此功能。

      我正在运行为服务器和客户端启用 SSL 的 Kafka 代理。下面的命令我使用

      kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x

      /tmp/ssl_config 如下

      security.protocol=SSL
      ssl.truststore.location=truststore_file_path.jks
      ssl.truststore.password=truststore_password
      ssl.keystore.location=keystore_file_path.jks
      ssl.keystore.password=keystore_password
      ssl.key.password=key_password
      

      【讨论】:

        【解决方案8】:

        有时感兴趣的是了解每个分区中的消息数量,例如,在测试自定义分区器时。随后的步骤已经过测试,可与 Confluent 3.2 中的 Kafka 0.10.2.1-2 一起使用。给定一个 Kafka 主题,kt 和以下命令行:

        $ kafka-run-class kafka.tools.GetOffsetShell \
          --broker-list host01:9092,host02:9092,host02:9092 --topic kt
        

        打印显示三个分区中消息计数的示例输出:

        kt:2:6138
        kt:1:6123
        kt:0:6137
        

        行数可能或多或少取决于主题的分区数。

        【讨论】:

        • 如果启用了日志压缩,那么对分区的偏移量求和可能无法给出主题中消息的准确计数。
        【解决方案9】:

        在最新版本的 Kafka Manager 中,有一列标题为Summed Recent Offsets

        【讨论】:

          【解决方案10】:

          使用Kafka 2.11-1.0.0的Java客户端,你可以做以下事情:

              KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
              consumer.subscribe(Collections.singletonList("test"));
              while(true) {
                  ConsumerRecords<String, String> records = consumer.poll(100);
                  for (ConsumerRecord<String, String> record : records) {
                      System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
          
                      // after each message, query the number of messages of the topic
                      Set<TopicPartition> partitions = consumer.assignment();
                      Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
                      for(TopicPartition partition : offsets.keySet()) {
                          System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
                      }
                  }
              }
          

          输出是这样的:

          offset = 10, key = null, value = un
          partition test is at 13
          offset = 11, key = null, value = deux
          partition test is at 13
          offset = 12, key = null, value = trois
          partition test is at 13
          

          【讨论】:

          • 与@AutomatedMike 答案相比,我更喜欢你的答案,因为你的答案不会与seekToEnd(..)seekToBeginning(..) 改变consumer 状态的方法混淆。
          【解决方案11】:

          我实际上用它来对我的 POC 进行基准测试。您要使用 ConsumerOffsetChecker 的项目。您可以使用下面的 bash 脚本运行它。

          bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker  --topic test --zookeeper localhost:2181 --group testgroup
          

          结果如下: 如您在红色框中看到的,999 是当前主题中的消息数。

          更新:ConsumerOffsetChecker 自 0.10.0 起已弃用,您可能希望开始使用 ConsumerGroupCommand。

          【讨论】:

          • 请注意,ConsumerOffsetChecker 已被弃用,并将在 0.9.0 之后的版本中删除。请改用 ConsumerGroupCommand。 (kafka.tools.ConsumerOffsetChecker$)
          • 是的,我就是这么说的。
          • 你最后一句话不准确。上述命令在 0.10.0.1 中仍然有效,警告与我之前的评论相同。
          【解决方案12】:

          要获取为该主题存储的所有消息,您可以将消费者寻找到每个分区的流的开头和结尾,并对结果求和

          List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
                  .map(p -> new TopicPartition(topic, p.partition()))
                  .collect(Collectors.toList());
              consumer.assign(partitions); 
              consumer.seekToEnd(Collections.emptySet());
          Map<TopicPartition, Long> endPartitions = partitions.stream()
                  .collect(Collectors.toMap(Function.identity(), consumer::position));
              consumer.seekToBeginning(Collections.emptySet());
          System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());
          

          【讨论】:

          • 顺便说一句,如果您打开了压缩,则流中可能存在间隙,因此实际消息数可能低于此处计算的总数。要获得准确的总数,您将不得不重播消息并计算它们。
          【解决方案13】:

          Apache Kafka 命令获取主题所有分区上未处理的消息:

          kafka-run-class kafka.tools.ConsumerOffsetChecker 
              --topic test --zookeeper localhost:2181 
              --group test_group
          

          打印:

          Group      Topic        Pid Offset          logSize         Lag             Owner
          test_group test         0   11051           11053           2               none
          test_group test         1   10810           10812           2               none
          test_group test         2   11027           11028           1               none
          

          第 6 列是未处理的消息。像这样把它们加起来:

          kafka-run-class kafka.tools.ConsumerOffsetChecker 
              --topic test --zookeeper localhost:2181 
              --group test_group 2>/dev/null | awk 'NR>1 {sum += $6} 
              END {print sum}'
          

          awk 读取行,跳过标题行并将第 6 列相加,最后打印总和。

          打印

          5
          

          【讨论】:

            【解决方案14】:

            使用https://prestodb.io/docs/current/connector/kafka-tutorial.html

            由 Facebook 提供的超级 SQL 引擎,可连接多个数据源(Cassandra、Kafka、JMX、Redis ...)。

            PrestoDB 作为带有可选工作器的服务器运行(有一个没有额外工作器的独立模式),然后您使用一个小的可执行 JAR(称为 presto CLI)进行查询。

            配置好 Presto 服务器后,就可以使用传统的 SQL 了:

            SELECT count(*) FROM TOPIC_NAME;
            

            【讨论】:

            • 这个工具很好,但是如果你的主题有两个以上的点,它就不起作用了。
            【解决方案15】:

            我自己没有尝试过this,但这似乎是有道理的。

            您也可以使用kafka.tools.ConsumerOffsetChecker (source)。

            【讨论】:

              【解决方案16】:

              从消费者的角度来看,唯一想到的方法是实际消费消息并计算它们。

              Kafka 代理公开 JMX 计数器,用于记录自启动以来收到的消息数量,但您无法知道其中有多少已被清除。

              在最常见的场景中,Kafka 中的消息最好被视为无限流,获取当前保存在磁盘上的数量的离散值并不相关。此外,当处理在一个主题中都有消息子集的代理集群时,事情会变得更加复杂。

              【讨论】:

              猜你喜欢
              • 2015-08-01
              • 2018-01-19
              • 2018-07-07
              • 2020-02-16
              • 2019-02-10
              • 1970-01-01
              • 1970-01-01
              • 2015-07-03
              • 1970-01-01
              相关资源
              最近更新 更多