【问题标题】:KafkaTimeoutError: Failed to update metadata after 60.0 secsKafkaTimeoutError:60.0 秒后无法更新元数据
【发布时间】:2020-09-23 16:08:18
【问题描述】:

我有一个高吞吐量 kafka 生产者的用例,我想每秒推送数千条 json 消息。

我有一个 3 节点的 kafka 集群,我正在使用最新的 kafka-python 库,并且有以下方法来生成消息

def publish_to_kafka(topic):
    data = get_data(topic)
    producer = KafkaProducer(bootstrap_servers=['b1', 'b2', 'b3'],
                             value_serializer=lambda x: dumps(x).encode('utf-8'), compression_type='gzip')
    try:
        for obj in data:
           producer.send(topic, value=obj)
    except Exception as e:
            logger.error(e)
    finally:
        producer.close()

我的主题有 3 个分区。

方法有时会正常工作,但会失败并出现错误“KafkaTimeoutError: Failed to update metadata after 60.0 secs。”

我需要更改哪些设置才能使其顺利运行?

【问题讨论】:

  • 您能分享一下您的 Kafka 代理配置 (server.properties) 吗?另外,当您说它有时失败时,您的意思是使用完全相同的主题吗?

标签: python apache-kafka kafka-producer-api kafka-python


【解决方案1】:
  1. 如果某个主题不存在,而您尝试生成该主题并且自动创建主题设置为 false,那么它可能会发生。

    可能的解决方案:在代理配置中(server.properties)auto.create.topics.enable=true(注意,这是 Confluent Kafka 中的默认设置)

  2. 另一种情况可能是网络拥塞或速度,如果使用 Kafka 代理更新元数据的时间超过 60 秒。

    可能的解决方案:生产者配置:max.block.ms = 1200000(120 秒,例如)

  3. 检查您的代理是否因某种原因(例如,负载过多)出现故障,以及它们无法响应元数据请求的原因。通常,您可以在 server.log 文件中看到它们。

【讨论】:

    猜你喜欢
    • 2019-06-14
    • 2021-08-27
    • 2018-06-23
    • 1970-01-01
    • 2019-03-18
    • 1970-01-01
    • 2018-04-06
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多