【问题标题】:Kafka Consumer returns null value卡夫卡消费者返回空值
【发布时间】:2021-06-16 10:10:09
【问题描述】:

我正在尝试在 Java 中创建一个 kafka 消费者,但 consumer.poll(5000) 方法调用无论如何都会返回 null 值。这是代码:

package com.apache.kafka.consumer;


import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.Logger;

import org.apache.kafka.clients.consumer.ConsumerRecords;

public class Consumer {
    public static void main(String[] args) throws Exception {
        final Logger logger = Logger.getLogger(Consumer.class);
          //Kafka consumer configuration settings
          String topicName = "mytopic";
          Properties props = new Properties();
      
      
           props.put("bootstrap.servers", "localhost:9092");
           props.put("group.id", "test");
           props.put("enable.auto.commit", "true");
           props.put("auto.offset.reset","earliest");
           props.put("auto.commit.interval.ms", "1000");
           props.put("key.deserializer", 
           "org.apache.kafka.common.serialization.StringDeserializer");
           props.put("value.deserializer", 
           "org.apache.kafka.common.serialization.StringDeserializer");
           props.put("partition.assignment.strategy", "range");
           KafkaConsumer<String, String> consumer = new 
           KafkaConsumer<String, String>(props);
   
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe("sampletopic");
      while (true) {
          Map<String,ConsumerRecords<String, String>> records = consumer.poll(0);
          for (ConsumerRecords<String, String> record : records.values()) {
              System.out.println(records);
          }
     }
}

}

请帮忙!!!

我已经创建了主题,并且在其中添加了一些数据,而且 zookeeper 和 kafka 运行良好。我不知道为什么 poll() 方法返回 null。

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api consumer


    【解决方案1】:

    poll 的调用需要在一个循环中,这就是为什么文献称它为轮询循环

    如果它返回null,它要么轮询过早并退出main,要么主题中没有数据

    在此处查看用法示例https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

         Properties props = new Properties();
         props.setProperty("bootstrap.servers", "localhost:9092");
         props.setProperty("group.id", "test");
         props.setProperty("enable.auto.commit", "true");
         props.setProperty("auto.commit.interval.ms", "1000");
         props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
         consumer.subscribe(Arrays.asList("foo", "bar"));
         while (true) {
             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
             for (ConsumerRecord<String, String> record : records)
                 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
         }
    

    注意循环^

    【讨论】:

    • 好的,我照你说的做了,但我仍然为空。我还检查了 kafka 和 zookeeper 正在运行,并且主题“mytopic”中有值。
    • 你能更新你的问题,用你新的sn-p循环吗?另请注意,您没有像示例中那样设置value.deserializer
    • 感谢您对我的耐心,我现在已经用循环编辑了代码,还添加了value.deserializer仍然空指针异常。
    • 我解决了!这基本上是依赖问题,但在那之后我应用了你的解决方案,它就像一个魅力谢谢男人!
    猜你喜欢
    • 2020-10-28
    • 2019-05-20
    • 2019-07-03
    • 2018-05-05
    • 2021-08-22
    • 1970-01-01
    • 1970-01-01
    • 2020-10-28
    相关资源
    最近更新 更多