【问题标题】:Log offset, partition and topic from Kafka message来自 Kafka 消息的日志偏移量、分区和主题
【发布时间】:2021-10-20 11:18:14
【问题描述】:

我想从 Kafka 的消息中记录或打印偏移量、分区和主题。 我可以打印消息值,但我想使用 python 查看来自 Kafka 的偏移量和分区,以便我可以调试我的代码

from kafka import KafkaConsumer

consumer = KafkaConsumer('my-topic',
                         group_id='my-group',
                         bootstrap_servers=['localhost:9092'])
for message in consumer:
    print(f"Message is {message.value()}") 

【问题讨论】:

    标签: python apache-kafka kafka-consumer-api kafka-python


    【解决方案1】:

    该数据应使用点表示法提供

    消费者迭代器返回ConsumerRecords,它们是简单的命名元组,公开基本消息属性:主题、分区、偏移量、键和值:

    https://github.com/dpkp/kafka-python

    【讨论】:

    • 我添加了一个示例代码,我可以在其中打印消息,请您帮助我了解如何打印消息的偏移量、主题和分区。
    • 我假设你有print(msg.value)?您对print(msg.topic, msg.partition, msg.offset) 有什么问题?
    • 当我打印(msg.value)时,我得到输出为
    • 好吧,documentation shows msg.value is a dict when deserializing,不是一个函数......如果它是一个函数,那么使用 msg.value() 就像你在你的问题中显示的那样。你需要调用函数...你也可以print(dir(msg))查看对象的所有可用属性
    • logger.debug(f"消费者数据:offset-{message.offset()} partition-{message.partition()} topic-{message.topic()}") 这对我有用谢谢
    【解决方案2】:
    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer('my-topic',
                             group_id='my-group',
                             bootstrap_servers=['localhost:9092'])
    for message in consumer:
        logger.debug(f"Consumer data: offset-{message.offset()} partition-{message.partition()} topic-{message.topic()}")
        print(f"Message is {message.value()}")
    

    【讨论】:

      猜你喜欢
      • 2022-08-15
      • 1970-01-01
      • 2018-05-30
      • 2020-03-05
      • 1970-01-01
      • 1970-01-01
      • 2019-07-20
      • 2021-02-22
      相关资源
      最近更新 更多