【发布时间】:2022-02-11 19:46:55
【问题描述】:
我得到了以下概念,这是一个真实应用程序的简化示例。问题是producer 在multiprocessing.Process 进程中工作时不会产生消息。
import logging
import multiprocessing
import os
from kafka import KafkaProducer
KAFKA_HOST = os.getenv('KAFKA_HOST')
KAFKA_PORT = os.getenv('KAFKA_PORT')
KAFKA_NAME = os.getenv('KAFKA_NAME')
producer = KafkaProducer(bootstrap_servers=f'{KAFKA_HOST}:{KAFKA_PORT}')
def test_produce():
# producer1 = KafkaProducer(bootstrap_servers=f'{KAFKA_HOST}:{KAFKA_PORT}')
for i in range(10):
print('inserting') # prints once
producer.send(KAFKA_NAME, b'test').get() # will not be commited in the topic
producer.send(KAFKA_NAME, b'test_in_non_thread1').get() # will be commited in the topic
producer.send(KAFKA_NAME, b'test_in_non_thread2').get() # will be commited in the topic
multiprocessing.Process(target=test_produce).start()
【问题讨论】:
标签: python apache-kafka kafka-python