【问题标题】:Consumer not receiving messages, kafka console, new consumer api, Kafka 0.9消费者未收到消息,kafka 控制台,新消费者 api,Kafka 0.9
【发布时间】:2020-08-21 20:06:33
【问题描述】:

我正在为 Kafka 0.9.0.0 执行 Kafka Quickstart

我让 zookeeper 在 localhost:2181 收听,因为我跑了

bin/zookeeper-server-start.sh config/zookeeper.properties

我有一个经纪人在 localhost:9092 监听,因为我跑了

bin/kafka-server-start.sh config/server.properties

我有一个制作人发布到主题“测试”,因为我跑了

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
yello
is this thing on?
let's try another
gimme more

当我运行旧的 API 使用者时,它工作通过运行

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

但是,当我运行新的 API 使用者时,我什么也得不到

bin/kafka-console-consumer.sh --new-consumer --topic test --from-beginning \
    --bootstrap-server localhost:9092

是否可以使用新的 api 从控制台消费者订阅主题?我该如何解决?

【问题讨论】:

  • 你使用的是什么 scala 版本?你编译编译kafka了吗?我在使用 kafka_2.10-0.9.0.0.tgz 时遇到了一些小问题,但是使用 kafka_2.101-0.9.0.0.tgz 时,它就像一个魅力,包括您的示例。
  • 好的,谢谢,这是 2.10 的。如果我再试一次,它将是 2.11。
  • 您是否创建了“测试”主题?

标签: apache-kafka kafka-consumer-api


【解决方案1】:

在我的 MAC 机器上,我遇到了同样的问题,即控制台消费者在使用命令时不消费任何消息

kafka-console-consumer --bootstrap-server localhost:9095 --from-beginning --topic my-replicated-topic

但是当我尝试使用

kafka-console-consumer --bootstrap-server localhost:9095 --from-beginning --topic my-replicated-topic --partition 0

它愉快地列出了发送的消息。这是 Kafka 1.10.11 中的错误吗?

【讨论】:

  • 这对我也有用..但是为什么我们必须提到分区号
  • 它在最后通过参数--partition 0 后工作。
  • 我在 2.x 上也发现了同样的问题。这个答案解决了。
  • 这只是一个补丁解决方案,我建议验证 zookeeper 和 kafka brokers 上的所有警告和错误日志。一旦集群健康,就可以尝试使用 boostrap 服务器进行读取。
  • 为什么这个解决方案有效?这是 Kafka 中的错误吗?
【解决方案2】:

我刚遇到这个问题,解决方法是在zookeeper中删除/brokers,然后重启kafka节点。

bin/zookeeper-shell <zk-host>:2181

然后

rmr /brokers

不知道为什么这可以解决它。

当我启用调试日志时,我在消费者中一遍又一遍地看到这个错误消息:

2017-07-07 01:20:12 DEBUG AbstractCoordinator:548 - Sending GroupCoordinator request for group test to broker xx.xx.xx.xx:9092 (id: 1007 rack: null) 2017-07-07 01:20:12 DEBUG AbstractCoordinator:559 - Received GroupCoordinator response ClientResponse(receivedTimeMs=1499390412231, latencyMs=84, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=13,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) for group test 2017-07-07 01:20:12 DEBUG AbstractCoordinator:581 - Group coordinator lookup for group test failed: The group coordinator is not available. 2017-07-07 01:20:12 DEBUG AbstractCoordinator:215 - Coordinator discovery failed for group test, refreshing metadata

【讨论】:

  • 我正在使用 CDH 5.13 和 CDK 4.0 (apache 2.1)。遇到同样的问题。但是,按照您的建议删除代理和主题后,消费者仍然没有收到消息。
  • 这个解决方案对我有用,但这并不容易,因为只是删除了 zookeeper 上的所有引用,我已经删除了 kafka 代理上的日志文件并确保集群是健康的,我做了一些更正集群,我没有列出所有已应用的修复程序,我建议访问 zookeeper 和 kafka 服务器日志并查看尝试删除所有错误和警告消息。
【解决方案3】:

对我来说,此线程中描述的解决方案有效 - https://stackoverflow.com/a/51540528/7568227

检查是否

offsets.topic.replication.factor

(或可能与复制相关的其他配置参数) 不高于经纪人的数量。这就是我的问题。

在此修复后不再需要使用 --partition 0。

否则我建议按照提到的线程中描述的调试过程。

【讨论】:

  • 嗨,miko,感谢您在 StackOVerflow 上回答问题 :-) 由于原始线程的基本部分是您提到的调试过程,请将其转移到您的问题中。
【解决方案4】:

在我的 Mac 上遇到了同样的问题。 我查看了日志,发现如下错误。

Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). 
This error can be ignored if the cluster is starting up and not all brokers are up yet.

这可以通过将复制因子更改为 1 来解决。在 server.properties 中添加以下行并重新启动 Kafka/Zookeeper。

offsets.topic.replication.factor=1

【讨论】:

【解决方案5】:

在我的情况下,这不起作用

kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

这行得通

kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic --partition 0

因为主题__consumer_offsets 位于无法访问的代理上。基本上,我忘了复制它。搬迁 __consumer_offsets 解决了我的问题。

【讨论】:

    【解决方案6】:

    我遇到了同样的问题,现在我想通了。

    使用--zookeeper时,应该提供zookeeper地址作为参数。

    当你使用--bootstrap-server时,它应该带有代理地址作为参数。

    【讨论】:

    • 问题是他们已经提供了一个代理地址作为参数;端口 9092 是默认的 Kafka 端口。
    • 好吧,并非总是如此,如果你为 docker 下载 HDP 沙箱,它默认为 6667
    【解决方案7】:

    你的本地主机是这里的 foo。 如果您将 localhost 单词替换为实际主机名,它应该可以工作。

    像这样:

    制片人

    ./bin/kafka-console-producer.sh --broker-list \
    sandbox-hdp.hortonworks.com:9092 --topic test
    

    消费者:

    ./bin/kafka-console-consumer.sh --topic test --from-beginning \    
    --bootstrap-server bin/kafka-console-consumer.sh --new-consumer \
    --topic test --from-beginning \
    --bootstrap-server localhost:9092
    

    【讨论】:

    • 消费者的命令不正确。你能纠正一下吗?
    【解决方案8】:

    此问题还影响使用 Flume 从 kafka 提取数据并将数据下沉到 HDFS。

    解决上述问题:

    1. 停止 Kafka 代理
    2. 连接到 Zookeeper 集群并移除 /brokers z 节点
    3. 重启 kafka 代理

    我们使用集群的kafka客户端版本和scala版本没有问题。 Zookeeper 可能有关于代理主机的错误信息。

    验证操作:

    在 kafka 中创建主题。

    $ kafka-console-consumer --bootstrap-server slavenode01.cdh.com:9092 --topic rkkrishnaa3210 --from-beginning
    

    打开一个生产者频道并向其提供一些消息。

    $ kafka-console-producer --broker-list slavenode03.cdh.com:9092 --topic rkkrishnaa3210
    

    打开一个消费者通道来消费来自特定主题的消息。

    $ kafka-console-consumer --bootstrap-server slavenode01.cdh.com:9092 --topic rkkrishnaa3210 --from-beginning
    

    从水槽测试这个:

    Flume 代理配置:

    rk.sources  = source1
    rk.channels = channel1
    rk.sinks = sink1
    
    rk.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
    rk.sources.source1.zookeeperConnect = ip-20-0-21-161.ec2.internal:2181
    rk.sources.source1.topic = rkkrishnaa321
    rk.sources.source1.groupId = flume1
    rk.sources.source1.channels = channel1
    rk.sources.source1.interceptors = i1
    rk.sources.source1.interceptors.i1.type = timestamp
    rk.sources.source1.kafka.consumer.timeout.ms = 100
    rk.channels.channel1.type = memory
    rk.channels.channel1.capacity = 10000
    rk.channels.channel1.transactionCapacity = 1000
    rk.sinks.sink1.type = hdfs
    rk.sinks.sink1.hdfs.path = /user/ce_rk/kafka/%{topic}/%y-%m-%d
    rk.sinks.sink1.hdfs.rollInterval = 5
    rk.sinks.sink1.hdfs.rollSize = 0
    rk.sinks.sink1.hdfs.rollCount = 0
    rk.sinks.sink1.hdfs.fileType = DataStream
    rk.sinks.sink1.channel = channel1
    

    运行水槽代理:

    flume-ng agent --conf . -f flume.conf -Dflume.root.logger=DEBUG,console -n rk
    

    观察来自消费者的日志,即来自主题的消息是用 HDFS 编写的。

    18/02/16 05:21:14 INFO internals.AbstractCoordinator: Successfully joined group flume1 with generation 1
    18/02/16 05:21:14 INFO internals.ConsumerCoordinator: Setting newly assigned partitions [rkkrishnaa3210-0] for group flume1
    18/02/16 05:21:14 INFO kafka.SourceRebalanceListener: topic rkkrishnaa3210 - partition 0 assigned.
    18/02/16 05:21:14 INFO kafka.KafkaSource: Kafka source source1 started.
    18/02/16 05:21:14 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean.
    18/02/16 05:21:14 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: source1 started
    18/02/16 05:21:41 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
    18/02/16 05:21:42 INFO hdfs.BucketWriter: Creating /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp
    18/02/16 05:21:48 INFO hdfs.BucketWriter: Closing /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp
    18/02/16 05:21:48 INFO hdfs.BucketWriter: Renaming /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp to /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920
    18/02/16 05:21:48 INFO hdfs.HDFSEventSink: Writer callback called.
    

    【讨论】:

      【解决方案9】:

      使用这个:

      $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
      

      注意:从您的命令中删除 --new-consumer

      参考这里:https://kafka.apache.org/quickstart

      【讨论】:

      • 那么它就不会使用新的消费者了,问题是如何使用新的消费者来获取消息。
      • 0.9.0.0 在这个版本的 Kafka 中,他们的新控制台消费者无法正常工作,他们提供了 java 消费者,但没有提供控制台消费者。现在他们已经从更高版本中完全删除了“--new-consumer”。
      • 这个答案是我更正后编辑的,所以请忽略我之前的评论。
      【解决方案10】:

      你可以试试这样吗:

      bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
      

      【讨论】:

      • 此命令用于运行旧消费者 API。问题中已经提到了您的答案。
      【解决方案11】:

      在我的情况下,使用这两种方法都不起作用,然后我还在 config/log4j.properties 将日志级别提高到 DEBUG,启动了控制台使用者

      ./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic MY_TOPIC
      

      然后得到下面的日志

      [2018-03-11 12:11:25,711] DEBUG [MetadataCache brokerId=10] Error while fetching metadata for MY_TOPIC-3: leader not available (kafka.server.MetadataCache)
      

      这里的重点是,我有两个 kafka 节点,但一个已关闭,由于某种原因,默认情况下 kafka-console 消费者不会消费,如果由于节点已关闭而导致某些分区不可用(在这种情况下为分区 3) .它不会发生在我的应用程序中。

      可能的解决方案是

      • 启动宕机代理
      • 删除主题并重新创建,这样所有分区都将放置在在线代理节点上

      【讨论】:

        【解决方案12】:

        从 bin 运行以下命令:

        ./kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
        

        “test”是主题名称

        【讨论】:

          【解决方案13】:

          我遇到了消费者完成执行的问题 在 kafka_2.12-2.3.0.tgz 中。

          尝试调试但未打印任何日志。

          尝试使用 kafka_2.12-2.2.2 运行良好 .工作正常。

          并尝试从quickstart guide 运行 zookeeper 和 kafka!

          【讨论】:

            【解决方案14】:

            就我而言,server.properties 中的broker.id=1 是个问题。

            当你只使用一个kafka服务器进行开发时,这应该是broker.id=0

            不要忘记删除所有日志并重新启动 zookeper 和 kafka

            • 删除/tmp/kafka-logs(在server.properties文件中定义)
            • 删除[your_kafka_home]/logs
            • 重启 Zookeper 和 Kafka

            【讨论】:

              【解决方案15】:

              在 kafka_2.11-0.11.0.0 中,zookeeper 服务器已被弃用,它正在使用 bootstrap-server,它将采用代理 IP 地址和端口。如果您提供正确的代理参数,您将能够使用消息。

              例如$ bin/kafka-console-consumer.sh --bootstrap-server :9093 --topic test --from-beginning

              我使用的是 9093 端口,对你来说可能会有所不同。

              问候。

              【讨论】:

                【解决方案16】:

                复制因子必须至少为 3

                ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test 
                

                【讨论】:

                  猜你喜欢
                  • 2021-06-06
                  • 2016-05-15
                  • 2018-07-01
                  • 1970-01-01
                  • 1970-01-01
                  • 2019-09-24
                  • 1970-01-01
                  • 2017-09-23
                  • 1970-01-01
                  相关资源
                  最近更新 更多