【问题标题】:KafkaTimeoutError: Failed to update metadata after 60.0 secs with kafka-pythonKafkaTimeoutError:使用 kafka-python 60.0 秒后无法更新元数据
【发布时间】:2021-08-27 23:47:30
【问题描述】:

我在localhost 上安装了kafka。它正在运行。

● kafka.service - Apache Kafka Server
     Loaded: loaded (/etc/systemd/system/kafka.service; enabled; vendor preset: enabled)
     Active: active (running) since Fri 2021-06-11 03:36:36 EEST; 23min ago
       Docs: http://kafka.apache.org/documentation.html
   Main PID: 103920 (java)
      Tasks: 71 (limit: 9353)
     Memory: 367.0M
     CGroup: /system.slice/kafka.service
             └─103920 /usr/lib/jvm/java-11-openjdk-amd64/bin/java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHe>

июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,214] INFO [/config/changes-event-process-thread]: Starting (kafka>
июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,292] INFO [SocketServer brokerId=0] Starting socket server accept>
июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,300] INFO [SocketServer brokerId=0] Started data-plane acceptor a>
июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,300] INFO [SocketServer brokerId=0] Started socket server accepto>
июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,307] INFO Kafka version: 2.7.0 (org.apache.kafka.common.utils.App>
июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,307] INFO Kafka commitId: 448719dc99a19793 (org.apache.kafka.comm>
июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,307] INFO Kafka startTimeMs: 1623371801301 (org.apache.kafka.comm>
июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,317] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
июн 11 03:57:24 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:57:24,797] INFO Creating topic PairsUpdated with configuration {} and initial>
июн 11 03:57:25 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:57:25,022] INFO [KafkaApi-0] Auto creation of topic PairsUpdated with 1 parti>

我使用kafka-python 作为客户。当我尝试发送消息时,我收到错误消息。

ma​​in.py

def send_kafka(topic: str, data: dict):
    kafka_producer = KafkaProducer(
        bootstrap_servers=["localhost:9092"],
        value_serializer=lambda m: json.dumps(m).encode("utf-8"),
    )
    kafka_producer.send("PairsUpdated", b"")

kafka.log

DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv4 ('127.0.0.1', 9092)]> Request 68: MetadataRequest_v1(topics=['PairsUpdated'])
DEBUG:kafka.protocol.parser:Received correlation id: 68
DEBUG:kafka.protocol.parser:Processing response MetadataResponse_v1
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv4 ('127.0.0.1', 9092)]> Response 68 (3.993988037109375 ms): MetadataResponse_v1(brokers=[], controller_id=-1, topics=[(error_code=5, topic='PairsUpdated', is_internal=False, partitions=[])])
DEBUG:kafka.producer.kafka:_wait_on_metadata woke after 6.909250736236572 secs.
DEBUG:kafka.producer.kafka:Requesting metadata update for topic PairsUpdated
DEBUG:kafka.client:Sending metadata request MetadataRequest_v1(topics=['PairsUpdated']) to node bootstrap-0
DEBUG:kafka.protocol.parser:Sending request MetadataRequest_v1(topics=['PairsUpdated'])

python.log

Internal Server Error: /api/v1/pairs/
Traceback (most recent call last):
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/django/core/handlers/exception.py", line 47, in inner
    response = get_response(request)
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/django/core/handlers/base.py", line 181, in _get_response
    response = wrapped_callback(request, *callback_args, **callback_kwargs)
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/django/views/decorators/csrf.py", line 54, in wrapped_view
    return view_func(*args, **kwargs)
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/rest_framework/viewsets.py", line 125, in view
    return self.dispatch(request, *args, **kwargs)
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/rest_framework/views.py", line 509, in dispatch
    response = self.handle_exception(exc)
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/rest_framework/views.py", line 469, in handle_exception
    self.raise_uncaught_exception(exc)
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/rest_framework/views.py", line 480, in raise_uncaught_exception
    raise exc
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/rest_framework/views.py", line 506, in dispatch
    response = handler(request, *args, **kwargs)
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/backoffice/controllers.py", line 78, in create
    send_kafka("PairsUpdated", b"")
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/backoffice/main.py", line 33, in send_kafka
    kafka_producer.send("PairsUpdated", b"")
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/kafka/producer/kafka.py", line 576, in send
    self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/kafka/producer/kafka.py", line 702, in _wait_on_metadata
    raise Errors.KafkaTimeoutError(
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

远程登录

>>> telnet 127.0.0.1 9092
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.

Ubuntu==20.04

Python==3.8.5

kafka-python==2.0.2

kafka.service

[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
Environment="JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64"
ExecStart=/usr/local/kafka-server/bin/kafka-server-start.sh /usr/local/kafka-server/config/server.properties
ExecStop=/usr/local/kafka-server/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

【问题讨论】:

  • 看看this。您是否尝试过增加 Kafka 内存?
  • KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" ? @AchyutVyas
  • 是的,@Dima,我的 Kafka 配置与您的 Environmet="KAFKA_HEAP_OPTS=-Xmx1G -Xms1G" 相同,但我的 Kafka 服务消耗了大约 1.6G 内存。我对您的 Kafka 服务的内存消耗感到困惑。您可以发布有问题的 Kafka 服务文件吗?
  • 您可以在服务中添加和更改此配置 [Environmet="KAFKA_HEAP_OPTS=-Xmx1G -Xms1G"] 吗?
  • console-producer 也是一个 JVM 进程,它为自己分配大约 128Mb 和 512Mb 之间的空间。如果您因运行 Java 进程而受到内存限制,请坚持使用 kafkacat 或您的 Python 脚本

标签: python apache-kafka kafka-producer-api kafka-python


【解决方案1】:

查看 Kafka 服务状态 很明显你的 Kafka 服务在低内存(367.0 M)上运行


您可以验证 Timeout here 背后的原因。

加薪
KafkaTimeoutError – 如果无法获取主题元数据,或者无法在配置 max_block_ms 之前获取内存缓冲区


您可以通过更新 Kafka 服务文件从外部提供内存。

Environment="KAFKA_HEAP_OPTS=-Xmx1G -Xms1G"

最后重启kafka服务。

【讨论】:

  • 注意:默认已经是-Xmx1G -Xms1G
  • @OneCricketeer 是的,默认值为-Xmx1G -Xms1G,但内存不足的可能原因是什么? (而且 367 M 对于 Kafka 服务来说真的很低吗?)
  • 也许 367Mb 是操作系统看到的已用内存,而不是实际的 JVM 堆边界。顺便说一下,TimeoutError 在客户端,而不是代理内存。
猜你喜欢
  • 2020-09-23
  • 2019-06-14
  • 2018-06-23
  • 2019-03-18
  • 1970-01-01
  • 2018-04-06
  • 2016-03-19
  • 2019-08-28
  • 2016-10-03
相关资源
最近更新 更多