【问题标题】:Kafka Consumer: No entry found for connection卡夫卡消费者:没有找到连接条目
【发布时间】:2019-06-23 21:56:12
【问题描述】:

我正在尝试通过使用来自远程 Kafka 集群上某个主题的数据来检查 kafka 消费者。使用kafka-console-consumer.sh 时出现以下错误:

 ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
java.lang.IllegalStateException: No entry found for connection 2147475658
    at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:330)
    at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:134)
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:885)
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:276)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:548)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:655)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:635)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:316)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
    at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:436)
    at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:104)
    at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:76)
    at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
    at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 0 messages

这是我使用的命令:

./bin/kafka-console-consumer.sh --bootstrap-server SSL://{IP}:{PORT},SSL://{IP}:{PORT},SSL://{IP}:{PORT} --consumer.config ./config/consumer.properties --topic MYTOPIC --group MYGROUP

这是./config/consumer.properties 文件:

bootstrap.servers=SSL://{IP}:{PORT},SSL://{IP}:{PORT},SSL://{IP}:{PORT}

# consumer group id
group.id=MYGROUP

# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
auto.offset.reset=earliest

#### Security
security.protocol=SSL
ssl.key.password=test1234
ssl.keystore.location=/opt/kafka/config/certs/keystore.jks
ssl.keystore.password=test1234
ssl.truststore.location=/opt/kafka/config/certs/truststore.jks
ssl.truststore.password=test1234

你知道问题是什么吗?

【问题讨论】:

    标签: apache-kafka kafka-consumer-api


    【解决方案1】:

    在我的情况下,我在尝试连接到我的 Kafka 容器时收到该消息,我必须通过以下内容:

    -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092

    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092

    希望对某人有所帮助

    【讨论】:

      【解决方案2】:

      在将 Kafka 和 Zookeeper 作为 Docker 容器运行时,我遇到了这个问题(消费者和生产者)。

      解决方案是在Kafka brokers的config/server.properties文件中设置advertised.listeners,使其包含容器的IP地址,例如

      advertised.listeners=PLAINTEXT://172.15.0.8:9092
      

      有关在正确设置属性文件后用于在容器内启动 Kafka 的脚本示例,请参阅 https://github.com/maxant/kafkaplayground/blob/master/start-kafka.sh

      【讨论】:

      • 如果您可以访问主 Kafka 可以这样做,但如果您只是消费者或生产者并且无法访问 Kafka 集群,则最好在您的 DNS 上进行设置。
      • 同理,可以使用容器名代替ip地址。然后在您的主机文件中为容器名称添加一个别名。具有相同的效果,但在使用具有不同 IP 地址的容器时不会中断
      【解决方案3】:

      我发现了问题。最后是DNS问题。我通过 IP 地址联系了 Kafka 代理,但代理使用 DNS 名称回复。在消费者端设置 DNS 名称后,它又开始工作了。

      【讨论】:

      • 您能详细解释一下您是如何做到的吗?我也有类似的问题。
      • 可以在hosts文件中设置本地dns解析。例如,如果我使用 macOS,我会在 /etc/hosts 中添加一行,“your_kafka_host_name 1.2.3.4”。其他操作系统也一样。
      【解决方案4】:

      您确定远程 kafka 正在运行。我建议运行nmap -p PORT HOST 以验证端口是否打开(除非配置不同,端口应为 9092)。如果没问题,那么您可以使用 kafkacat 让事情变得更容易。创建一个运行 kafkacat -b HOST:PORT -t YOUR_TOPIC -C -o beginning 的消费者或创建一个运行 kafkacat -b HOST:PORT 的生产者 -t YOUR_TOPIC -P

      【讨论】:

        【解决方案5】:

        似乎没有在 server.properties 中配置 Kafka 集群侦听器属性。

        在远程 kafka 集群中,此属性应使用正确的主机名取消注释。

        listeners=PLAINTEXT://0.0.0.0:9092
        

        【讨论】:

        • Kafka 集群正常。人们可以在同一台机器上使用它。问题是我无法得到回复。
        猜你喜欢
        • 2020-07-24
        • 1970-01-01
        • 2018-12-06
        • 2019-08-06
        • 2019-09-19
        • 2020-03-14
        • 2019-07-03
        • 2018-05-05
        • 2021-08-22
        相关资源
        最近更新 更多