【发布时间】:2020-09-23 16:08:18
【问题描述】:
我有一个高吞吐量 kafka 生产者的用例,我想每秒推送数千条 json 消息。
我有一个 3 节点的 kafka 集群,我正在使用最新的 kafka-python 库,并且有以下方法来生成消息
def publish_to_kafka(topic):
data = get_data(topic)
producer = KafkaProducer(bootstrap_servers=['b1', 'b2', 'b3'],
value_serializer=lambda x: dumps(x).encode('utf-8'), compression_type='gzip')
try:
for obj in data:
producer.send(topic, value=obj)
except Exception as e:
logger.error(e)
finally:
producer.close()
我的主题有 3 个分区。
方法有时会正常工作,但会失败并出现错误“KafkaTimeoutError: Failed to update metadata after 60.0 secs。”
我需要更改哪些设置才能使其顺利运行?
【问题讨论】:
-
您能分享一下您的 Kafka 代理配置 (
server.properties) 吗?另外,当您说它有时失败时,您的意思是使用完全相同的主题吗?
标签: python apache-kafka kafka-producer-api kafka-python