【发布时间】:2021-04-25 15:52:46
【问题描述】:
我在将 kafka 用于我的 python 代码时遇到了问题。 我使用 python 2.7.5 和包 kafka-python。
我想通过 kafka 主题发送 csv(300000 行,每行 20 个字段)。在此之前我序列化每个 排入一个 json 文件,直到这里,一切正常。我的 Producer 发送文件的每一行,然后关闭。但 另一方面,我的消费者不消费任何东西......
就 kafka 而言,我有一个单独的主题和一个分区。 我的 kafka 和 zookeeper 实例包含在 docker 容器中,但不是我的消费者或生产者。
这是我的生产者代码: ...
def producer(path) :
producer = KafkaProducer(bootstrap_servers="localhost:9092", retries = 5)
with open(path, newline = '', encoding='utf-8-sig') as csvFile :
reader = csv.DictReader(csvFile, fieldnames = dataElements)
for row in reader :
log = process_row(row)
producer.send(topic = TOPIC, value = json.dumps(log).encode())
producer.flush()
producer.close()
print('processing done')
这是我给消费者的代码:
consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
consumer.subscribe(TOPIC)
for message in consumer:
log = json.loads(message.value.decode())
print(log)
consumer.close()
在运行我的生产者之后,我得到了“处理完成”。当我运行我的消费者时,我什么也得不到。 (我先运行我的消费者)。
我阅读了文档,它可能来自生产者配置。我仍然不确定应该修改哪些参数(linger_ms、batch_size ...?)。在我看来,默认值适用于我的情况。
有什么想法吗?
【问题讨论】:
-
如果我是你,我会使用 Python 3.x,因为不再支持 Python 2.7.x。对于您的问题,您可以使用akhq 来查看您的主题中发生了什么。
-
我愿意,但这不取决于我。好的,我要试试这个,谢谢!
-
有可用的迁移脚本,它应该是任何项目迁移的优先事项,因为不再支持以前的 Python。
-
我会告诉我的主管 :) 所以我检查了我的 docker 容器中的 kafka 实例。我的主题存在并且实际上有 1 个分区。我尝试通过控制台消费者消费,它没有消费任何记录......
-
我找到了一种解决方案:我创建了 2 个容器(1 个用于我的生产者,1 个用于我的消费者)并且它有效(我只是更改了 ip:ports)。所以问题似乎是连接。在我的 Kafka 容器和我的本地机器之间。
标签: docker kafka-consumer-api kafka-producer-api kafka-python