【问题标题】:Kafka 0.10 Java consumer not reading message from topicKafka 0.10 Java 消费者未从主题读取消息
【发布时间】:2018-07-27 08:00:31
【问题描述】:

我有一个像下面这样的简单 java 生产者

public class Producer 
{
    private final static String TOPIC = "my-example-topi8";
    private final static String BOOTSTRAP_SERVERS = "localhost:8092";

    public static void main( String[] args ) throws Exception {
        Producer<String, byte[]> producer = createProducer();
        for(int i=0;i<3000;i++) {
            String msg = "Test Message-" + i;
            final ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(TOPIC, "key" + i, msg.getBytes());
            producer.send(record).get();
            System.out.println("Sent message " + msg);
        }
        producer.close();
    }

    private static Producer<String, byte[]> createProducer() {
        Properties props = new Properties();
        props.put("metadata.broker.list", BOOTSTRAP_SERVERS);
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        props.put("client.id", "AppFromJava");
        props.put("serializer.class", "kafka.serializer.DefaultEncoder");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("compression.codec", "snappy");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        return new KafkaProducer<String, byte[]>(props);
    }
}

我正在尝试读取如下数据

public class Consumer 
{
    private final static String TOPIC = "my-example-topi8";
    private final static String BOOTSTRAP_SERVERS = "localhost:8092";

    public static void main( String[] args ) throws Exception {
        Consumer<String, byte[]> consumer = createConsumer();
        start(consumer);
    }

    static void start(Consumer<String, byte[]> consumer) throws InterruptedException {
        final int giveUp = 10;   
        int noRecordsCount = 0;
        int stopCount = 1000;

        while (true) {
            final ConsumerRecords<String, byte[]> consumerRecords = consumer.poll(1000);
            if (consumerRecords.count()==0) {
                noRecordsCount++;
                if (noRecordsCount > giveUp) break;
                else continue;
            }


            consumerRecords.forEach(record -> {
               // Process the record System.out.printf("\nConsumer Record:(%s, %s, %s)", record.key(), new String(record.value()), record.topic());
            });

            consumer.commitSync();
            break;
        }
        consumer.close();
        System.out.println("DONE");
    }

    private static Consumer<String, byte[]> createConsumer() {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                                    BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
                                    "KafkaExampleConsumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                ByteArrayDeserializer.class.getName());
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234");
        props.put("enable.auto.commit", "false");

        // Create the consumer using props.
        final Consumer<String, byte[]> consumer = new KafkaConsumer(props);
        consumer.subscribe(Collections.singletonList(TOPIC));
        return consumer;
    }
}

但是消费者没有从 kafka 读取任何消息。如果我在start()

处添加以下内容
consumer.poll(0);
consumer.seekToBeginning(consumer.assignment());

然后消费者开始阅读主题。但是每次重新启动消费者时,它都会从我不想要的主题的开头读取消息。如果我在启动 Consumer 时添加以下配置

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

然后它从主题中读取消息,但如果消费者在处理所有消息之前重新启动,则它不会读取未处理的消息。

谁能告诉我出了什么问题,我该如何解决这个问题?

Kafka broker 和 zookeeper 使用默认配置运行。

【问题讨论】:

  • 如果设置props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),是消费者读取一整批消息的问题,它在停止之前只处理部分消息,重新启动它会跳过读取的整批消息前?如果是这种情况,那么问题可能是所有读取消息后的偏移量都被自动提交。您可能希望禁用自动提交并提交实际处理的偏移量,即使读取了更多消息或减少消费者将读取的最大批量大小。
  • 在我发布在createConsumer() 的代码中,我正在设置这个props.put("enable.auto.commit", "false"); 我遇到的问题是假设有5000 条消息。然后假设消费者在通过 commitSync 提交此批次后,收到了 1000 条消息。如果消费者重新启动,那么我看不到消费者从 1001 开始收到任何消息。如果我不清楚我想问什么,请告诉我。
  • 好的。那我的评论就没有实际意义了。
  • 我面临错误:- java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.Consumer.subscribe(Ljava/util/Collection;)V

标签: java apache-kafka kafka-consumer-api kafka-producer-api


【解决方案1】:

您对 commitSync() 的调用是确认来自上次 poll() 的批处理中的所有消息,而不仅仅是每个单独的消息,因为您正在处理它们,这是我认为您正在尝试做的。

来自文档

“上面的示例使用 commitSync 将所有接收到的记录标记为已提交。在某些情况下,您可能希望通过显式指定偏移量来更好地控制已提交的记录。在下面的示例中,我们在处理完每个分区中的记录后提交偏移量。

 try {
     while(running) {
         ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
         for (TopicPartition partition : records.partitions()) {
             List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
             for (ConsumerRecord<String, String> record : partitionRecords) {
                 System.out.println(record.offset() + ": " + record.value());
             }
             long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
             consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
         }
     }
 } finally {
   consumer.close();
 }

注意:提交的偏移量应该始终是您的应用程序将读取的下一条消息的偏移量。因此,在调用 commitSync(offsets) 时,您应该在最后处理的消息的偏移量上加一。 ”

【讨论】:

  • 在我发布的玩具代码中,我正在通过consumerRecords.forEach(...) 处理整批消息,然后我通过consumer.commitSync(); 提交这批消息我遇到的问题是假设有 5000 条消息.然后假设消费者在通过commitSync 提交这批消息后,一批批收到了 1000 条消息。如果消费者重新启动,那么我看不到消费者从 1001 开始收到任何消息。如果我不清楚,请告诉我。
  • 您的停止重启时间是否超过 24 小时?除非更改 __consumer_offsets 主题的配置,否则提交的偏移量默认在 24 小时后过期。你能发布一个 kafka-topics.sh 的输出——描述消费者偏移主题吗?
  • 也许将 try/catch 放在 commitSync() 周围,以确保它不会引发不可恢复的异常。提交可能未成功完成。
  • 你是对的,处理消息时出现异常,绕过了剩余的处理。 commitSync 正在提交整个批次并将偏移量移到末尾。
猜你喜欢
  • 1970-01-01
  • 2018-12-15
  • 2021-07-09
  • 2018-07-01
  • 2017-05-12
  • 2017-10-25
  • 2017-11-09
  • 2018-06-04
  • 2022-01-04
相关资源
最近更新 更多