【问题标题】:Kafka-Python, Producer send record but Consumer don't receive itKafka-Python,生产者发送记录但消费者没有收到它
【发布时间】:2021-04-25 15:52:46
【问题描述】:

我在将 kafka 用于我的 python 代码时遇到了问题。 我使用 python 2.7.5 和包 kafka-python。

我想通过 kafka 主题发送 csv(300000 行,每行 20 个字段)。在此之前我序列化每个 排入一个 json 文件,直到这里,一切正常。我的 Producer 发送文件的每一行,然后关闭。但 另一方面,我的消费者不消费任何东西......

就 kafka 而言,我有一个单独的主题和一个分区。 我的 kafka 和 zookeeper 实例包含在 docker 容器中,但不是我的消费者或生产者。

这是我的生产者代码: ...

def producer(path) :
    producer = KafkaProducer(bootstrap_servers="localhost:9092", retries = 5)

    with open(path, newline = '', encoding='utf-8-sig') as csvFile :
        reader = csv.DictReader(csvFile, fieldnames = dataElements)
        for row in reader :
            log = process_row(row)
            producer.send(topic = TOPIC, value = json.dumps(log).encode())
    producer.flush()
    producer.close()
    print('processing done')

这是我给消费者的代码:

consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
consumer.subscribe(TOPIC)
for message in consumer:
    log = json.loads(message.value.decode())
    print(log)
consumer.close()

在运行我的生产者之后,我得到了“处理完成”。当我运行我的消费者时,我什么也得不到。 (我先运行我的消费者)。

我阅读了文档,它可能来自生产者配置。我仍然不确定应该修改哪些参数(linger_ms、batch_size ...?)。在我看来,默认值适用于我的情况。

有什么想法吗?

【问题讨论】:

  • 如果我是你,我会使用 Python 3.x,因为不再支持 Python 2.7.x。对于您的问题,您可以使用akhq 来查看您的主题中发生了什么。
  • 我愿意,但这不取决于我。好的,我要试试这个,谢谢!
  • 有可用的迁移脚本,它应该是任何项目迁移的优先事项,因为不再支持以前的 Python。
  • 我会告诉我的主管 :) 所以我检查了我的 docker 容器中的 kafka 实例。我的主题存在并且实际上有 1 个分区。我尝试通过控制台消费者消费,它没有消费任何记录......
  • 我找到了一种解决方案:我创建了 2 个容器(1 个用于我的生产者,1 个用于我的消费者)并且它有效(我只是更改了 ip:ports)。所以问题似乎是连接。在我的 Kafka 容器和我的本地机器之间。

标签: docker kafka-consumer-api kafka-producer-api kafka-python


【解决方案1】:

我使用以下内容解决了这个问题:https://www.kaaproject.org/blog/kafka-docker https://github.com/wurstmeister/kafka-docker/wiki/Connectivity

需要在 docker-compose.yml 中添加一些环境变量,如 KAFKA_ADVERTISED_HOST,以便客户端可以从 docker 主机外部连接到 kafka 代理。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-10-03
    • 2018-04-28
    • 1970-01-01
    • 2019-10-13
    • 2017-06-13
    • 2018-12-18
    • 1970-01-01
    相关资源
    最近更新 更多