【发布时间】:2019-07-21 06:20:36
【问题描述】:
我尝试使用 Kafka @ Wurstmeister 设置 docker-compose。
场景: 我开发了多个微服务的架构。具体来说:我有一个 Spring Boot 应用程序,可以将 JSON 发送到我的 kafka 代理。 Flask 服务使用数据。 这在 docker 之外运行整个想法时有效。我还可以将数据发送到 docker 中的 kafka 主题。
代码: 烧瓶:
KafkaHost = "kafka:9092"
def initkafka():
# connect to Kafka server and pass the topic we want to consume
consumer = KafkaConsumer("TEST",
group_id='view',
bootstrap_servers=[Constants.KafkaHost]
)
KafkaConsumer(auto_offset_reset='latest',
enable_auto_commit=False)
KafkaConsumer(value_deserializer=lambda m: json.loads(m.dedoce('utf-8')))
KafkaConsumer(consumer_timeout_ms=1000)
return consumer
Docker 编写:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
networks:
- test-net
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
#KAFKA_ADVERTISED_HOST_NAME: 172.17.0.1
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.17.0.1:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "TEST:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper
networks:
- test-net
错误
Traceback (most recent call last):
File "run.py", line 1, in <module>
from controller import Controller
File "/app/controller/Controller.py", line 27, in <module>
consumer = KafkaConfig.initkafka()
File "/app/config/KafkaConfig.py", line 16, in initkafka
enable_auto_commit=False)
File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 324, in __init__
self._client = KafkaClient(metrics=self._metrics, **self.config)
File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 221, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 826, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
我认为是环境配置的问题。我已经阅读了 wurstmeister 文档,但我无法弄清楚我需要设置什么才能让我的烧瓶服务找到卡夫卡经纪人。 日志说 kafka 正在运行,并且创建了“TEST”主题。 我是否必须配置侦听器,例如说我网络中的 ip 和端口会听 kafka 吗?因为在kafka docsadverted.listeners 中被描述为
要发布到 ZooKeeper 以供客户端使用的侦听器(如果与侦听器配置属性不同)。在 IaaS 环境中,这可能需要与代理绑定的接口不同。如果未设置,将使用侦听器的值。与侦听器不同,通告 0.0.0.0 元地址是无效的。
【问题讨论】:
-
你有烧瓶应用作为容器还是直接部署在你的本地主机上?
-
两者。我只用烧瓶服务尝试了我的撰写文件,它是 mongodb 实例。我禁用了 kafka 连接,它可以工作。但是当我启用 Kafka 连接时,我得到了错误。使用 kafka 运行整个项目(涉及的所有服务)时,它也可以正常工作。因此,要在我的容器中运行它,我只需将“localhost”更改为我的撰写文件中的服务名称
-
来自烧瓶容器,如果您可以从
ping kafka获得阳性结果,然后尝试telnet kafka 9092如果您的连接被拒绝,请确保 kafka 已绑定在0.0.0.0上,以便其他容器可以访问它 -
设置以下环境变量为
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://0.0.0.0:9092
标签: docker apache-kafka docker-compose