安装包 pykafka

代码如下:

from pykafka import KafkaClient


client = KafkaClient(hosts="test43:9092")
print(client.topics)
topic = client.topics[b'rokid']    #topic名称
consumer = topic.get_simple_consumer()
for record in consumer:
    if record is not None:
        valuestr = record.value.decode()   #从bytes转为string类型
        valuedict = eval(valuestr)
        message = valuedict["message"]
        fields = message.split("\u0001")
        for field in fields:
            kv = field.split("\u0002")
            if len(kv) == 2:
                print(kv[0],'----',kv[1])
        print('-'*100)

 

以上仅供开发测试使用,真正发布到线上需要多地方加固。。。

 

mark

相关文章:

  • 2021-06-20
  • 2021-12-28
  • 2021-09-17
  • 2021-11-29
  • 2021-12-07
  • 2021-07-19
  • 2021-06-11
  • 2021-09-15
猜你喜欢
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-07-06
  • 2021-10-04
  • 2022-02-02
  • 2022-12-23
相关资源
相似解决方案