【发布时间】:2018-12-30 07:56:44
【问题描述】:
我使用 kafka-python 处理 kafka 集群中的消息:
consumer = KafkaConsumer('session', auto_offset_reset='earliest']
当真时:
dict = consumer.poll(500)
for d in dict:
print d.topic, d.partition, d.value
这将给出错误“AttributeError:'TopicPartition'对象没有属性'value'”。
“dict”是这样的(来自'print dict')
{TopicPartition(topic=u'session', partition=0): [ConsumerRecord(topic=u'session', partition=0, offset=56, timestamp=None, timestamp_type=None, key=None, value='0000000000000000', headers=[], checksum=2855809697, serialized_key_size=-1, serialized_value_size=16, serialized_header_size=-1)]}
每个分区下可以有多个分区和数百个 ConsumerRecord。从 consumer.poll() 访问这些 ConsumerRecord 的正确方法是什么?提前致谢。
【问题讨论】:
标签: python pyspark kafka-consumer-api kafka-python