【问题标题】:Counting Number of messages stored in a kafka topic计算存储在 kafka 主题中的消息数
【发布时间】:2017-06-07 03:46:33
【问题描述】:

我正在使用 0.9.0.0 版本的 Kafka,我想在不使用管理脚本 kafka-console-consumer.sh 的情况下计算主题中的消息数。

我已经尝试了答案Java, How to get number of messages in a topic in apache kafka中的所有命令 但没有一个产生结果。 有谁能帮帮我吗?

【问题讨论】:

  • 您是否希望它也适用于压缩主题,因为这消除了许多选项,例如比较开始和持续偏移量。
  • 查看我的回答 here 以获得使用 Java 客户端的解决方案。

标签: apache-kafka kafka-consumer-api jms-topic


【解决方案1】:

您可以尝试执行以下命令:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test-topic --time -1

然后,汇总每个分区的所有计数。

更新:Java 实现

Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
......
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList("your_topic"));
    Set<TopicPartition> assignment;
    while ((assignment = consumer.assignment()).isEmpty()) {
        consumer.poll(Duration.ofMillis(100));
    }
    final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
    final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(assignment);
    assert (endOffsets.size() == beginningOffsets.size());
    assert (endOffsets.keySet().equals(beginningOffsets.keySet()));

    Long totalCount = beginningOffsets.entrySet().stream().mapToLong(entry -> {
            TopicPartition tp = entry.getKey();
            Long beginningOffset = entry.getValue();
            Long endOffset = endOffsets.get(tp);
            return endOffset - beginningOffset;
        }).sum();
    System.out.println(totalCount);
}

【讨论】:

  • 您应该计算最新和最早偏移之间的差异总和。 (--time -2) 参数给出最早的。
  • 你能提供同样的东西的java实现吗?
【解决方案2】:

从技术上讲,您可以简单地使用主题中的所有消息并计算它们:

示例:

kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9092 --topic XYZ --partition 0*

但是,kafka.tools.GetOffsetShell 方法会为您提供偏移量,而不是主题中的实际消息数。这意味着如果主题被压缩,如果您通过消费消息或读取偏移量来计算消息,您将得到两个不同的数字。

主题压缩:https://kafka.apache.org/documentation.html#design_compactionbasics

【讨论】:

  • 除非时间不相关,否则从 Kafka 的主题中读取可能不计其数(数百万?)的消息(这些消息在被清除之前是持久的 - 不像 JMS - 在读取之前是持久的)是不可行的。
  • 哪个计数可能更高,是偏移量还是消耗的消息数?我猜是第一个?
【解决方案3】:

你可以用这个来总结所有的计数:

.../bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list <<broker_1>>:9092,<<broker_2:9092>>... --topic <<your_topic_name>> --time -1 | while IFS=: read topic_name partition_id number; do echo "$number"; done | paste -sd+ - | bc

【讨论】:

  • 谢谢!简单一点的总结: kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $KAFKA_CLUSTER_HOSTS --topic $TOPIC_NAME --time -1 | tr ":" " " | awk '{ sum += $3 } END { print sum }'
  • @ozma 代替tr 你也可以使用awk -F: :D
【解决方案4】:

如果您不想接受围绕“原始”Kafka 脚本的麻烦,还有kafkacat

基本思路是

  • 消费每个分区的最后一条消息并
  • 将偏移量相加(校正从零开始的偏移量)。

让我们开发这个。

kafkacat -C -b <broker> -t <topic> -o -1 -f '%p\t%o\n'

这将输出类似这样的内容(加上 stderr 上的“到达分区结束”通知):

0    77
1    75
2    78

现在,kafkacat 不会终止,而是继续等待新消息。我们可以通过添加超时来规避这种情况(选择一​​个足够大的值,以便您获得给定环境中的所有分区):

timeout --preserve-status 1 kafkacat <snip>

现在我们可以继续将第二列相加(每列 +1)——但如果在该超时间隔内有新消息,我们可能会得到类似 this

0    77
1    75
2    78
1    76

所以我们必须考虑到这一点,这很容易做到一点awk

timeout --preserve-status 1 kafkacat <snip> 2> /dev/null \
| awk '{lastOffsets[$1] = $2} END {count = 0; for (i in lastOffsets) { count += lastOffsets[i] + 1 }; print count}'

注意我们如何使用(哈希)映射来记住每个分区的最后一次看到的偏移量,直到超时触发,然后循环遍历数组以计算总和。

【讨论】:

    【解决方案5】:

    你也可以使用 awk 和一个简单的循环来做到这一点

    for i in `kafka-run-class kafka.tools.GetOffsetShell --broker-list broker:9092 --time -1 --topic topic_name| awk -F : '{print $3}'`; do sum=$(($sum+$i)); done
    

    【讨论】:

      【解决方案6】:

      我们可以使用kafkacat 命令来统计一个主题中的消息数。命令如下。请注意,即使您的消息是多行的,此命令也可以工作。

      kafkacat -b <broker_1_ip:port>,<broker_2_ip:port> -t <topic-name> -C -e -q -f 'Offset: %o\n' | wc -l
      

      从控制台上打印的数字中减去 1 即为答案。

      【讨论】:

        【解决方案7】:

        获取topic中的记录数

        brokers="<broker1:port>"
        topic=<topic-name>
        sum_1=$(/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time -1 | grep -e ':[[:digit:]]*:' | awk -F  ":" '{sum += $3} END {print sum}')
        sum_2=$(/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time -2 | grep -e ':[[:digit:]]*:' | awk -F  ":" '{sum += $3} END {print sum}')
        echo "Number of records in topic ${topic}: "$((sum_1 - sum_2))
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2021-08-07
          • 2018-03-17
          • 1970-01-01
          • 1970-01-01
          • 2021-01-22
          • 1970-01-01
          • 1970-01-01
          • 2017-10-02
          相关资源
          最近更新 更多