【问题标题】:How to fetch ConsumerRecord from KafkaConsumer.poll() in python如何在 python 中从 KafkaConsumer.poll() 获取 ConsumerRecord
【发布时间】:2018-12-30 07:56:44
【问题描述】:

我使用 kafka-python 处理 kafka 集群中的消息:

consumer = KafkaConsumer('session', auto_offset_reset='earliest']

当真时:

   dict = consumer.poll(500)

   for d in dict:

     print d.topic, d.partition, d.value

这将给出错误“AttributeError:'TopicPartition'对象没有属性'value'”。

“dict”是这样的(来自'print dict')

{TopicPartition(topic=u'session', partition=0): [ConsumerRecord(topic=u'session', partition=0, offset=56, timestamp=None, timestamp_type=None, key=None, value='0000000000000000', headers=[], checksum=2855809697, serialized_key_size=-1, serialized_value_size=16, serialized_header_size=-1)]}

每个分区下可以有多个分区和数百个 ConsumerRecord。从 consumer.poll() 访问这些 ConsumerRecord 的正确方法是什么?提前致谢。

【问题讨论】:

    标签: python pyspark kafka-consumer-api kafka-python


    【解决方案1】:

    你在 dict 使用上有一个错误;默认情况下,“for d in dict:”表示“for d in dict.keys():”,因此您只能获取此 dict 的键。 试试这个:

    dict = consumer.poll(500)
    for key, value in dict.items():
        print(key)
        print()
        for record in value[:10]:
            print(record)
            print()
    

    这可以解决您的错误。

    【讨论】:

      猜你喜欢
      • 2019-05-20
      • 2022-06-16
      • 2021-11-22
      • 2020-09-28
      • 2021-06-03
      • 2020-03-03
      • 2018-03-17
      • 2014-07-06
      • 2019-07-24
      相关资源
      最近更新 更多