【问题标题】:Why doesn't offset get updated when messages are consumed in Kafka为什么在Kafka中消费消息时偏移量不更新
【发布时间】:2017-03-10 09:32:14
【问题描述】:

我正在实现 Kafka 消费者类来接收消息。我只想每次都收到新消息。因此,我将enable.auto.commit 设置为true。然而,偏移似乎根本没有改变。即使主题、消费者组和分区始终相同。

这是我的消费者代码:

    consumerConfig.put("bootstrap.servers", bootstrap);
    consumerConfig.put("group.id", KafkaTestConstants.KAFKA_GROUP);
    consumerConfig.put("enable.auto.commit", "true");
    consumerConfig.put("auto.offset.reset", "earliest");
    consumerConfig.put("auto.commit.interval", 1000);
    consumerConfig.put("key.deserializer", StringDeserializer.class.getName());
    consumerConfig.put("value.deserializer", StringDeserializer.class.getName());
    StringDeserializer deserializer = new StringDeserializer();
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);

    TopicPartition tp = new TopicPartition(KafkaTestConstants.KAFKA_TOPIC, 0);
    List<TopicPartition> tps = Arrays.asList(tp);
    kafkaConsumer.assign(tps);
    long offset = kafkaConsumer.position(tp);
    System.out.println("Offset of partition: " + offset);

    List<String> messages = new ArrayList<String>();
    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);

    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Message received: " + record.value());
        messages.add(record.value());
    }

    consumer.commitAsync();
    System.out.println("Offset commited.\n");
    consumer.close();

无论我运行多少次,它总是显示偏移量为0。因此,它总是从一开始就接收所有消息。我错过了什么?

编辑:根据 Matthias 的回答,我决定手动提交偏移量。但是commitSync() 会挂起。 commitAsync() 之类的作品。稍后我将解释“某种”。以下是代码的作用:

producer send 2 messages;
consumer initiates;
print out current position;
consumer.poll();
print received messages;
consumer.commitAsync();

这就是这段代码的行为方式。假设我有 100 条消息。现在生产者发送 2 条新消息。在消费者轮询之前,它会显示当前偏移位置为 102,应该是 100。因此,不会打印出新消息。这几乎就像在生产者发送消息后更新偏移量一样。

【问题讨论】:

标签: java apache-kafka kafka-consumer-api


【解决方案1】:

仅当您使用消费者组管理时,自动提交才有效,为此,您需要“订阅”主题,而不是手动“分配”分区。

比较KafkaConsumer 的JavaDocs。这是一篇很长的文章,但需要了解如何正确使用消费者的微妙细节:https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

此外,如果启用了自动提交,它将在poll 内提交(即,对poll() 的调用可能会提交从先前对poll() 的调用返回的消息)而不是在您迭代返回时消息。这也意味着,您的提交将向前“跳跃”,例如从提交的偏移量 0 到 100(如果您通过轮询单个分区收到 100 条消息)。

【讨论】:

  • @MatthiasJSax subsription 对我不起作用。 polling 将永远挂起。这就是我切换到assign 的原因。请参阅question I asked earlier
  • 不知道为什么subscribe() 让消费者挂了。但是,如果您不使用“消费者组管理”,则需要手动处理故障转移场景(即,如果您有多个消费者并且需要确保分配不同的分区 - 如果您不想阅读数据多次 - 您需要自己检测故障并手动将失败消费者的分区重新分配给剩余的消费者 - 某些数据未得到处理)。还需要手动提交,启动时手动读取提交的偏移量才能恢复,“auto.offset.reset”也不起作用。
  • 因此,我建议找到subscribe() 问题的根源,而不是切换到assign()。但这当然取决于你。
  • 我尝试过手动偏移提交。它似乎确实更新了偏移量。然而,提交似乎发生在消息产生之后和消费之前。在我的编辑中查看详细信息。
  • 你能更新你的消费者代码吗?你用KafkaConsumer#committed(TopicPartitions) 吗?
猜你喜欢
  • 1970-01-01
  • 2015-11-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-08-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多