【问题标题】:Can't send to Kafka from NiFi无法从 NiFi 发送到 Kafka
【发布时间】:2018-11-07 10:14:33
【问题描述】:

我在 Docker for Windows 中运行,这是我的 NiFi 设置:

PublishKafka 处理器的详细信息:

ConsumeKafka 处理器的详细信息:

这是我的 docker-compose 文件(注意:192.168.1.50 是我的静态内部主机 IP):

version: '3'
services:
  Jenkins:
    container_name: Jenkins
    restart: on-failure
    depends_on:
    - NiFi
    image: jenkins:latest
    ports:
      - "32779:50000"
      - "32780:8080"
  NiFi:
    container_name: NiFi
    image: xemuliam/nifi:latest
    restart: on-failure
    depends_on:
    - kafka
    ports:
      - "32784:8089"
      - "32783:8080"
      - "32782:8081"
      - "32781:8443"
    labels:
      com.foo: myLabel
  zookeeper:
    container_name: Zookeeper
    image: wurstmeister/zookeeper
    restart: on-failure
    #network_mode: host
    ports:
      - "2181:2181"
  kafka:
    #container_name: Kafka
    image: wurstmeister/kafka
    depends_on:
    - zookeeper
    #restart: on-failure
    #network_mode: host
    ports:
      - "9092"
    environment:
      #KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.50:9092
      #KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      
      KAFKA_CREATE_TOPICS: "MainIngestionTopic:1:1"
      KAFKA_ZOOKEEPER_CONNECT: 192.168.1.50:2181
      KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://192.168.1.50:9094
      KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      
    volumes:
      - ./var/run/docker.sock:/var/run/docker.sock

当我跟踪 Kafka 容器日志时,我可以看到我的主题是从 docker-compose 成功创建的。

消息已成功传递到 NiFi 中的 PublishKafka 处理器,但随后无法发布。订阅同一个主题的 ConsumeKafka 处理器永远不会收到消息。

NiFi 容器日志显示如下:

2018-05-28 19:46:18,792 ERROR [Timer-Driven Process Thread-1] o.a.n.p.kafka.pubsub.PublishKafka PublishKafka[id=b2503f49-acc9-38f5-86f9-5029e2768b68] Failed to send all message for StandardFlowFileRecord[uuid=b3f6f818-34d3-42a9-9d6e-636cf17eb138,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1527533792820-1, container=default, section=1], offset=5, length=5],offset=0,name=8151630985100,size=5] to Kafka; routing to failure due to org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.


org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.


2018-05-28 19:46:18,792 INFO [Timer-Driven Process Thread-1] o.a.kafka.clients.producer.KafkaProducer Closing the Kafka producer with timeoutMillis = 5000 ms.

我尝试从 Kafka 容器内部发布到主题,但也失败了:

我已经梳理了文档并阅读了许多试图解决此问题的线程,但这仍然是一个问题。任何帮助将不胜感激!

【问题讨论】:

  • 嗨。我不确定为什么消费者无法阅读,但您需要将ConsumeKafka 连接到下游处理器才能实际接收消息。如果您将 ConsumeKafka 的 success 关系设置为自动终止,您将不会收到消息。
  • 嗨。您是否尝试使用您的 IP 192.168.1.50 而不是 localhost

标签: docker apache-kafka docker-compose apache-nifi kafka-producer-api


【解决方案1】:

您不能在 NiFi 的“Kafka Brokers”属性中使用 localhost,除非代理实际上在运行 NiFi 的同一主机上运行。由于每个服务都在 docker 容器中,因此 kafka 的容器必须具有可以使用的特定主机名或 ip。

【讨论】:

  • 这完全有道理。我更新了 PublishKafka 中的 Kafka Brokers 设置,以查看 kafka 机器而不是 localhost。这让我摆脱了元数据更新错误,但现在我收到“org.apache.kafka.common.errors.TimeoutException: Batch Expired”
猜你喜欢
  • 2021-08-14
  • 2020-07-17
  • 1970-01-01
  • 2019-08-28
  • 2022-12-16
  • 1970-01-01
  • 2020-04-21
  • 2016-12-25
  • 2022-10-24
相关资源
最近更新 更多