【问题标题】:How to count number of records (message) in the topic using kafka-python如何使用kafka-python计算主题中的记录(消息)数
【发布时间】:2021-01-22 14:58:45
【问题描述】:

正如标题中所说,我想在我的主题中获得一些记录,但我找不到使用 kafka-python 库的解决方案。 有人知道吗?

【问题讨论】:

  • 是的@mike。这个解决方案与它非常相似。我会尝试在 Python 中重现它(我不太擅长 Java)
  • 我无法使用命令行。在我的项目中,我需要使用 Python Kafka API

标签: python apache-kafka kafka-python


【解决方案1】:

没有特定的 API 来计算来自主题的记录数。您需要消费并统计您从 kafka 消费者那里收到的记录数。

【讨论】:

    【解决方案2】:

    一种解决方案是您可以将一条消息添加到所有分区并获取最后一个偏移量。根据偏移量,您可以计算到现在发送到主题的总消息数。

    但这不是正确的方法。您不知道消费者已经消费了多少消息,以及 kafka 删除了多少消息。唯一的方法是您可以消费消息并计算数量。

    【讨论】:

    • 当我在我的主题中消费消息时,它只返回一条消息。你知道我如何消费所有消息吗?我的消费者功能:bootstrap_servers = bootstrap_server_list.split(",") consumer_kafka = KafkaConsumer(topic, bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest', consumer_timeout_ms=10000) consumer_kafka.subscribe([topic]) for message in consumer_kafka: if len(message) > 0: print(message)
    • 我不确定如何在 kafka-python 中使用消息。但我尝试运行 - from kafka import KafkaConsumer bootstrap_servers = ['localhost:9092'] consumer_kafka = KafkaConsumer(bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest',consumer_timeout_ms=10000) consumer_kafka.subscribe(['test']) for message in consumer_kafka: if len(message) > 0: print(message) 它工作正常并消耗所有消息。您可以查看此代码,希望对您有所帮助。
    • 这实际上是在无限循环中,我们如何摆脱它??
    【解决方案3】:

    我无法使用 kafka-python 完成这项工作,但我可以使用 confluent-kafka 库相当轻松地做到这一点:

    from confluent_kafka import Consumer
    
    topic = "test_topic"
    broker = "localhost:9092"
    
    def get_count():
        consumer = Consumer({
            'bootstrap.servers': broker,
            'group.id': 'my-group',
            'auto.offset.reset': 'earliest',
        })
    
        consumer.subscribe([topic])
    
        total_message_count = 0
        while True:
            msg = consumer.poll(1.0)
    
            if msg is None:
                print("No more messages")
                break
            if msg.error():
                print("Consumer error: {}".format(msg.error()))
                continue
    
            total_message_count = total_message_count + 1
            print('Received message {}: {}'.format(total_message_count,     
    msg.value().decode('utf-8')))
    
        consumer.close()
    
        print(total_message_count)
    

    【讨论】:

      猜你喜欢
      • 2021-08-07
      • 1970-01-01
      • 2017-06-07
      • 2018-03-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-11-25
      • 2017-02-20
      相关资源
      最近更新 更多