【问题标题】:kafka-python raise UnrecognizedBrokerVersion Errorkafka-python 引发 UnrecognizedBrokerVersion 错误
【发布时间】:2020-02-26 14:50:37
【问题描述】:

使用 kafka-python 包构建 KafkaProducer 时出现此错误:

[ERROR] UnrecognizedBrokerVersion: UnrecognizedBrokerVersion
Traceback (most recent call last):
  File "/var/lang/lib/python3.7/imp.py", line 234, in load_module
    return load_source(name, filename, file)
  File "/var/lang/lib/python3.7/imp.py", line 171, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 696, in _load
  File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/var/task/kafka/producer/kafka.py", line 381, in __init__
    **self.config)
  File "/var/task/kafka/client_async.py", line 240, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/var/task/kafka/client_async.py", line 908, in check_version
    version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
  File "/var/task/kafka/conn.py", line 1228, in check_version
    raise Errors.UnrecognizedBrokerVersion()

代码如下:

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=os.environ.get('KAFKA_HOST', 'localhost:9092'))

我正在使用 Python 3.7 和 AWS MSK 集群。

【问题讨论】:

标签: python apache-kafka kafka-python aws-msk


【解决方案1】:

kafka-python lib 方法有很多可选参数。这是我能够监听 Azure 服务器的最近的工作脚本:

import os, kafka  # pip install kafka-python

consumer = kafka.KafkaConsumer("test-topic",
    bootstrap_servers=["test-server.servicebus.windows.net:9093"],
    auto_offset_reset="earliest",
    enable_auto_commit=True,
    group_id="$Default",
    sasl_mechanism="PLAIN",
    sasl_plain_password=os.environ["KAFKA_PASSWORD"],
    sasl_plain_username="$ConnectionString",
    security_protocol="SASL_SSL",
    value_deserializer=lambda x: x.decode("utf-8"))

print(datetime.datetime.now())
for message in consumer:
    message_dict = json.loads(message.value)
    print(datetime.datetime.now())
    print("%s" % (json.dumps(message_dict,indent=4)))

它很粗糙,但很有效。比 https://kafka.apache.org/quickstart 页面中的 Java-wrapped-in-bash Kafka 演示使用者更简单(需要各种配置文件和环境变量。)

【讨论】:

    【解决方案2】:

    只需将security_protocol="SSL" 添加到KafkaProducer 即可解决,如下所示:

    from kafka import KafkaProducer
    producer = KafkaProducer(security_protocol="SSL", bootstrap_servers=os.environ.get('KAFKA_HOST', 'localhost:9092'))
    

    【讨论】:

    • 如果不使用 SSL,我可以使用 PLAINTEXT 吗? import os from kafka import KafkaConsumer producer = KafkaConsumer(security_protocol="PLAINTEXT", bootstrap_servers=os.environ.get('KAFKA_HOST', 'localhost:2181')) 后返回错误引发Errors.UnrecognizedBrokerVersion() kafka.errors.UnrecognizedBrokerVersion: UnrecognizedBrokerVersion,如何解决?
    • @NikolayBaranenko 将您的端口从 2181 更改为 9092。
    猜你喜欢
    • 2016-09-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-02-02
    • 1970-01-01
    • 1970-01-01
    • 2015-08-30
    • 2013-01-10
    相关资源
    最近更新 更多