【问题标题】:Simple Kafka Consumer not receiving messages简单的 Kafka 消费者没有收到消息
【发布时间】:2017-11-29 08:40:24
【问题描述】:

我是 Kafka 的新手,正在运行 KafkaConsumerKafkaProducer 上给出的简单 kafka 消费者/生产者示例。当我从终端运行消费者时,消费者正在接收消息,但我无法使用 Java 代码进行监听。 我也在 StackoverFlow 上搜索过类似的问题(链接:Link1Link2)并尝试了这些解决方案,但似乎没有任何东西对我有用。 Kafka版本:kafka_2.10-0.10.2.1,pom中使用了对应的maven依赖。

生产者和消费者的Java代码:

public class SimpleProducer {
public static void main(String[] args) throws InterruptedException {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9094");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);
    for(int i = 0; i < 10; i++)
        producer.send(new ProducerRecord<String, String>("topic3", Integer.toString(i), Integer.toString(i)));

    producer.close();

}}

public class SimpleConsumer {

public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9094");
    props.put("group.id", "test");
    props.put("zookeeper.connect", "localhost:2181");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("topic3", "topic2"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}}

开始卡夫卡: bin/kafka-server-start.sh config/server.properties(我已经在属性文件中设置了端口、brokerid)

【问题讨论】:

  • 当您从终端说您的意思是使用 kafka 消费者/生产者工具时?也许发布您的 Java 源代码可能会有用。
  • 添加了代码。是的,如果我从终端运行 kafka 消费者,我可以从上面的 Java 生产者代码中监听消息。
  • 为您的 java 生产者启用日志记录并将其设置为 DEBUG 级别以查看您的生产者在做什么。还要确保主题名称正确,并且您的消费者具有从头开始的设置
  • 你不需要放 "zookeeper.connect" 属性。

标签: java maven apache-kafka kafka-consumer-api


【解决方案1】:

首先使用以下命令检查所有可用的组:

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

然后使用下面的 cmd 检查您的主题属于哪个组:

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <your group name> --describe

一旦您找到您的主题和相关组名(如果它不属于默认组,只需将 group.id 替换为您的组)然后尝试使用以下道具并让我知道它是否有效:

  props.put("bootstrap.servers", "localhost:9092");
  props.put("group.id", "test-consumer-group"); // default topic name
  props.put("enable.auto.commit", "true");
  props.put("auto.commit.interval.ms", "1000");
  props.put("session.timeout.ms", "30000");
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
  KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

  //Kafka Consumer subscribes list of topics here.
  consumer.subscribe(Arrays.asList(topicName));  // replace you topic name

  //print the topic name

  java.util.Map<String,java.util.List<PartitionInfo>> listTopics = consumer.listTopics();
  System.out.println("list of topic size :" + listTopics.size());

  for(String topic : listTopics.keySet()){
      System.out.println("topic name :"+topic);
  }

【讨论】:

  • 它列出了所有主题,包括消费者订阅的主题(topic3)
  • 用java程序或通过cmd列出所有主题?如果通过 java 程序,这意味着您的主题属于 默认组 id,如果是这种情况,那么现在使用上面的道具,您应该能够从主题中获取数据。
  • 谢谢一百万!!您的解决方案对我有用。这么多天以来我一直在努力解决这个问题。但是如果我想创建一个消费者组并将其链接到我创建的主题,我该怎么做才能不使用默认设置。
  • 我遇到了同样的问题,当使用“test-consumer-group”作为组 ID 时它开始工作。但我不明白为什么如果我放置一个不同的不存在的组 ID 它不起作用。你能解释一下原因吗?
  • 我第一次使用 kafka 并且遇到了同样的问题。 @Sanjay 你的回答真的很有帮助。
【解决方案2】:

在运行生产者之前运行消费者,以便消费者首先向组协调器注册。稍后当你运行生产者时,消费者消费消息。你第一次运行消费者时,它向组协调器注册。为了找出消费者消费消息的偏移量使用这个kafka-consumer-offset-checker.bat --group group-1 --topic testing-1 --zookeeper localhost:2181这表明消费者最后消费了主题的哪个偏移量。

【讨论】:

  • 我面临错误:- java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.Consumer.subscribe(Ljava/util/Collection;)V
【解决方案3】:

清除您访问 kafka 的驱动器中的 'tmp' 文件夹。然后打开新的“cmd”命令窗口!重新启动服务器,并在命令窗口中发布“.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic H1 --from-beginning”此代码以运行消费者而不会出现任何错误

【讨论】:

    【解决方案4】:

    尝试将enable.partition.eof参数设置为false

    props.put("enable.partition.eof", "false");
    

    这对我有用。

    【讨论】:

    • 添加哪个文件?
    • 我在代码中做到了。但我猜有些连接器可以配置一个文件。
    【解决方案5】:

    试试这个代码对我有用。

    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
    "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
    KafkaConsumer<String, String> myConsumer = new KafkaConsumer<>(props);
    myConsumer.subscribe(Arrays.asList(topicName));
    myConsumer.subscribe(topics);
    
    try{
          while (true) {
                  ConsumerRecords<String, String> records = myConsumer.poll(100);
                  for (ConsumerRecord<String, String> record : records) {
                      System.out.println(String.format( "Topic: %s, Partition: %d, Offset: %d, key: %s, value: %s",
                              record.topic(),record.partition(), record.offset(),record.key(),record.value()
                      ));
                  }}
        }catch (Exception e){
            System.out.println(e.getMessage());
        }finally {
            myConsumer.close();
        }
    

    【讨论】:

      【解决方案6】:

      当我在 Windows 7 上本地安装最新版本的 kafka kafka_2.13-2.6.0.tgz (asc, sha512) 时遇到了这个问题。消息没有从生产者流向消费者。 检查发现没有创建消费者偏移主题不知道为什么。

      安装了早期版本的 Kakfa kafka_2.12-2.5.0.tgz 并开始工作。消费者偏移主题是用旧版本创建的

      【讨论】:

        猜你喜欢
        • 2018-09-12
        • 2020-11-15
        • 2022-08-13
        • 2017-06-13
        • 1970-01-01
        • 1970-01-01
        • 2021-06-06
        • 2016-05-15
        • 1970-01-01
        相关资源
        最近更新 更多