【发布时间】:2017-07-14 00:33:18
【问题描述】:
我使用 Kafka 2.12 和 kafka-python 模块作为 Kafka 客户端。我正在尝试测试一个简单的生产者:
class Producer(Process):
daemon = True
def run(self):
producer = KafkaProducer(bootstrap_servers='kafka:9092')
print("Sending messages...")
producer.send('topic', json.dumps(message).encode('utf-8'))
当这个进程被实例化时,消息永远不会被消费者接收到
如果我刷新生产者并更改 linger_ms 参数(使其同步),则消息由消费者发送和读取:
class Producer(Process):
daemon = True
def run(self):
producer = KafkaProducer(bootstrap_servers='kafka:9092', linger_ms=10)
print("Sending messages...")
producer.send('topic', json.dumps(message).encode('utf-8'))
producer.flush()
在以前的 Kafka 版本中,有参数 queue.buffering.max.ms 来指定生产者将等待多长时间直到发送队列中的消息,但在最新版本中不存在(kafka-python 1.3. 3)。我如何在较新的 Kafka 版本中指定这一点以保持我的通信异步?
谢谢!
【问题讨论】:
标签: python apache-kafka kafka-python