【问题标题】:Kafka consumer "Failed to add leader for partitions" error running on Mesos在 Mesos 上运行的 Kafka 消费者“无法为分区添加领导者”错误
【发布时间】:2015-12-22 08:45:10
【问题描述】:

我正在使用mesos/kafka 库运行一个由 6 个代理组成的 Kafka 集群。我能够在 6 台不同的机器上添加和启动代理,并使用 Python SimpleProducer 和 kafka-console-producer.sh 脚本将消息发布到集群中。

但是我无法让消费者正常工作。我正在运行以下消费者命令:

bin/kafka-console-consumer.sh --zookeeper 192.168.1.199:2181 --topic test --from-beginning --consumer.config config/consumer.properties --delete-consumer-offsets

在 consumer.properties 文件中,我将 group.id 设置为 my.group,并将 zookeeeper.connect 设置为 zookeeper 集合中的多个节点。我从运行此消费者中收到以下警告消息:

            [2015-09-24 16:01:06,609] WARN [my.group_my_host-1443106865779-b5a3a1e1-leader-finder-thread], Failed to add l
    eader for partitions [test,4],[test,1],[test,5],[test,2],[test,0],[test,3]; will retry (kafka.consumer.ConsumerFetcherM
    anager$LeaderFinderThread)
    java.nio.channels.ClosedChannelException
            at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
            at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
            at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
            at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127)
            at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:166)
            at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
            at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:177)
            at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:172)
            at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
            at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
            at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
            at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:172)
            at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:87)
            at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:77)
            at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
            at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
            at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
            at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
            at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:77)
            at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
            at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
    {'some':2}
    [2015-09-24 16:20:02,362] WARN [my.group_my_host-1443108001180-fa0c93e4-leader-finder-thread], Failed to add leader for partitions [test,4],[test,1],[test,5],[test,2],[test,0],[test,3]; will retry (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
    java.nio.channels.ClosedChannelException
            at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
            at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
            at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
            at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127)
            at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:166)
            at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
            at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:177)
            at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:172)
            at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
            at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
            at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
            at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:172)
            at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:87)
            at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:77)
            at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
            at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
            at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
            at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
            at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:77)
            at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
            at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
    ...
    // Lots more of this
    ...
    Consumed 1 messages

我不确定为什么无法添加领导者,领导者似乎已经在 Zookeeper 中。除了所有这些错误消息,我只能将一条消息传递给消费者。字符串{'some':2} 是我从控制台生产者发送的消息。

我在其中一个 Mesos 从站的server.log 中发现了这个错误,不确定是否相关:

[2015-09-24 17:09:41,926] ERROR Closing socket for /192.168.1.199 because of error (kafka.network.Processor)
java.io.IOException: Broken pipe
            at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
            at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
            at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
            at sun.nio.ch.IOUtil.write(IOUtil.java:65)
            at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
            at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123)
            at kafka.network.MultiSend.writeTo(Transmission.scala:101)
            at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
            at kafka.network.Processor.write(SocketServer.scala:472)
            at kafka.network.Processor.run(SocketServer.scala:342)
            at java.lang.Thread.run(Thread.java:745)

关于消费者可能发生的事情或我可以在哪里解决问题的任何建议?

其中一个日志分区的 Zookeeper 代理分区状态:

[zk: localhost:2181(CONNECTED) 166] get /brokers/topics/test/partitions/0/state
{"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]}

操作系统:Ubuntu 14.0.4 金币:0.23 卡夫卡:2.10-0.8.2.1

更新: 使用 kafka-console-consumer.sh 进行一些进一步测试,消息似乎确实通过了。错误消息是不变的,因此您看不到stdout 中的所有消息。 Python KafkaConsumer 立即失败并返回 FailedPayloadsError

【问题讨论】:

    标签: apache-kafka apache-zookeeper mesos


    【解决方案1】:

    我认为您需要查看属性“advertised.host.name”的价值。我最近也遇到了这个问题并使用上述属性进行了修复。
    请确保您为每个 BROKER 提供了正确的 IP 地址。
    如果不起作用,请告诉我。

    【讨论】:

    • 感谢@Bector 的回复。 mesos/kafka 调度程序似乎不允许我设置这个值。我在添加代理时放入了一个 server.properties 文件,但它似乎并没有真正设置。即使使用该库设置端口也不适合我。我会在他们的 Github 页面上提出问题。
    • 它可能不允许您修改它,但是您从 server.properties 设置的属性正在从某些地方被覆盖,这就是您无法看到修改后的更改的原因。很高兴您提出了这个问题,但您应该能够修改此属性。
    【解决方案2】:

    尝试运行以下命令:

    bin/kafka-topics.sh --zookeeper your.zookeeper:2181 --describe --topic your_topic
    

    这将显示哪个代理是每个主题分区的领导者(有关更多详细信息,请参阅此链接:http://kafka.apache.org/documentation.html#quickstart_multibroker

    在我的情况下,被设置为领导者的代理之一失败并且不再存在。应该指定一个新的领导,但由于某种原因它没有。

    我通过以下方式解决了这个问题:

    1. 停止所有生产者和消费者
    2. 重新启动每个剩余的代理

    然后我重新运行describe 命令(从上面),可以看到失败的代理不再被列为领导者。

    然后我提出了一个与失败的代理具有相同 ID 的新代理。 Kafka 从那里获取它并从我的其他代理那里带来所有数据(这要求您的主题具有足够的复制因子)。数据结束后,Kafka 让 broker 成为分区负责人。

    最后,我重新启动了生产者和消费者。

    【讨论】:

    • 我目前面临这个问题,当我尝试你的命令时,我得到 Topic:MY_TOPIC PartitionCount:1 ReplicationFactor:1 Configs: Topic: MY_TOPIC Partition: 0 Leader: -1 Replicas: 1012 Isr:
    • @Ratha 你找到解决方法了吗?我有同样的问题。
    • @user2441441 不,我不能,但我升级到 kafka 0.10.0.1,并使用高级消费者/生产者 API。现在一切正常
    【解决方案3】:

    我的问题是:

    1. 运行动物园管理员
    2. 已创建主题
    3. 运行卡夫卡

    然后我得到“No leader found exception”

    但是当我在 Zookeeper 和 Kafka 正常运行时创建主题时,它工作正常。

    【讨论】:

      猜你喜欢
      • 2014-07-14
      • 2021-11-08
      • 2017-01-04
      • 1970-01-01
      • 1970-01-01
      • 2018-09-22
      • 2013-03-02
      • 1970-01-01
      • 2017-10-17
      相关资源
      最近更新 更多