【问题标题】:Error Docker compose configure listener for kafka Service. No Broker available错误 Docker compose 为 kafka 服务配置侦听器。没有可用的经纪人
【发布时间】:2019-07-21 06:20:36
【问题描述】:

我尝试使用 Kafka @ Wurstmeister 设置 docker-compose。

场景: 我开发了多个微服务的架构。具体来说:我有一个 Spring Boot 应用程序,可以将 JSON 发送到我的 kafka 代理。 Flask 服务使用数据。 这在 docker 之外运行整个想法时有效。我还可以将数据发送到 docker 中的 kafka 主题。

代码: 烧瓶:

KafkaHost = "kafka:9092"
def initkafka():
    # connect to Kafka server and pass the topic we want to consume
    consumer = KafkaConsumer("TEST",
                             group_id='view',
                             bootstrap_servers=[Constants.KafkaHost]
                             )
    KafkaConsumer(auto_offset_reset='latest',
                  enable_auto_commit=False)

    KafkaConsumer(value_deserializer=lambda m: json.loads(m.dedoce('utf-8')))
    KafkaConsumer(consumer_timeout_ms=1000)
    return consumer

Docker 编写:

zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    networks:
      - test-net

  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      #KAFKA_ADVERTISED_HOST_NAME: 172.17.0.1
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.17.0.1:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "TEST:1:1"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - zookeeper
    networks:
      - test-net

错误

Traceback (most recent call last):
  File "run.py", line 1, in <module>
    from controller import Controller
  File "/app/controller/Controller.py", line 27, in <module>
    consumer = KafkaConfig.initkafka()
  File "/app/config/KafkaConfig.py", line 16, in initkafka
    enable_auto_commit=False)
  File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 324, in __init__
    self._client = KafkaClient(metrics=self._metrics, **self.config)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 221, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 826, in check_version
    raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable

我认为是环境配置的问题。我已经阅读了 wurstmeister 文档,但我无法弄清楚我需要设置什么才能让我的烧瓶服务找到卡夫卡经纪人。 日志说 kafka 正在运行,并且创建了“TEST”主题。 我是否必须配置侦听器,例如说我网络中的 ip 和端口会听 kafka 吗?因为在kafka docsadverted.listeners 中被描述为

要发布到 ZooKeeper 以供客户端使用的侦听器(如果与侦听器配置属性不同)。在 IaaS 环境中,这可能需要与代理绑定的接口不同。如果未设置,将使用侦听器的值。与侦听器不同,通告 0.0.0.0 元地址是无效的。

【问题讨论】:

  • 你有烧瓶应用作为容器还是直接部署在你的本地主机上?
  • 两者。我只用烧瓶服务尝试了我的撰写文件,它是 mongodb 实例。我禁用了 kafka 连接,它可以工作。但是当我启用 Kafka 连接时,我得到了错误。使用 kafka 运行整个项目(涉及的所有服务)时,它也可以正常工作。因此,要在我的容器中运行它,我只需将“localhost”更改为我的撰写文件中的服务名称
  • 来自烧瓶容器,如果您可以从ping kafka 获得阳性结果,然后尝试telnet kafka 9092 如果您的连接被拒绝,请确保 kafka 已绑定在 0.0.0.0 上,以便其他容器可以访问它
  • 设置以下环境变量为KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://0.0.0.0:9092

标签: docker apache-kafka docker-compose


【解决方案1】:

除非我弄错了,KAFKA_ADVERTISED_LISTENERS 需要与您在烧瓶客户端中定义的 kafka 主机具有相同的值。因此,如果您从 docker 容器内连接到 Kafka,您应该拥有KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092。如果从主机连接,应该是KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092

或者,您可以省略KAFKA_ADVERTISED_LISTENERS 设置并改为定义KAFKA_ADVERTISED_HOST_NAME: kafka

【讨论】:

  • 来自容器内部和主机的连接对您来说究竟意味着什么。我认为这是发布订阅。所以我的 Kafka Container 在我的 docker 网络中提供了一个 host:port 来订阅阅读主题。该主机应该是我的 docker 服务的 de DNS 条目(服务名称)吗?当我使用`KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT://localhost:9092`时,我得到java.lang.IllegalArgumentException: requirement failed: Each listener must have a different port, listeners: PLAINTEXT://kafka:9092,PLAINTEXT://localhost:9092
  • 好的,那么我将编辑我的答案。我的观点是,广告的侦听器必须与消费者使用的主机名相对应。所以,是的,当flask在docker中运行时,广播的监听器等于docker服务的DNS条目,也就是:“kafka”。
  • 好的,由于错误 NoBrokersAvailable 和日志,我确信该主题已创建。还有什么我可以检查配置的吗?
  • 对不起,我不明白你的问题:-/建议的设置对你有用吗?
  • 对不起,我的评论有点混乱。我只是想说,在查看日志后,我确定这一定是监听器配置的问题。
【解决方案2】:

所以我已经通过@cricket_007 提到的this。现在有点清楚了,但我仍然很难获得连接。 回顾一下我的场景:我在同一个 Docker 网络中运行我的所有服务和消息代理。因此,无需进行外部连接。 在此博客条目中给出了一个示例:

KAFKA_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB

我想我知道这个配置意味着什么。 就我而言,我认为我必须这样改变它:

      KAFKA_LISTENERS: LISTENER_PY://kafka:9092
      KAFKA_ADVERTISED_LISTENERS: LISTENER_PY://kafka:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_PY:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_PY

我想 KAFKA_INTER_BROKER_LISTENER_NAME 是不需要的,因为我只有一个经纪人。但是侦听器名称( LISTENER_PY )是否取决于我的 Flask 服务名称或任何其他属性?据我了解,我可以使用“kafka”作为 ip,因为我在 docker-compose 中将 Kafka 作为名为“kafka”的服务运行。我尝试了这种配置,但仍然无法正常工作。我想知道它是如何在我的 spring 服务中作为生产者连接而不定义任何侦听器配置的。

【讨论】:

    猜你喜欢
    • 2019-01-28
    • 2017-08-17
    • 1970-01-01
    • 2023-02-15
    • 2021-05-23
    • 1970-01-01
    • 2021-09-13
    • 2017-12-29
    • 2018-02-19
    相关资源
    最近更新 更多