【发布时间】:2017-03-10 09:32:14
【问题描述】:
我正在实现 Kafka 消费者类来接收消息。我只想每次都收到新消息。因此,我将enable.auto.commit 设置为true。然而,偏移似乎根本没有改变。即使主题、消费者组和分区始终相同。
这是我的消费者代码:
consumerConfig.put("bootstrap.servers", bootstrap);
consumerConfig.put("group.id", KafkaTestConstants.KAFKA_GROUP);
consumerConfig.put("enable.auto.commit", "true");
consumerConfig.put("auto.offset.reset", "earliest");
consumerConfig.put("auto.commit.interval", 1000);
consumerConfig.put("key.deserializer", StringDeserializer.class.getName());
consumerConfig.put("value.deserializer", StringDeserializer.class.getName());
StringDeserializer deserializer = new StringDeserializer();
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);
TopicPartition tp = new TopicPartition(KafkaTestConstants.KAFKA_TOPIC, 0);
List<TopicPartition> tps = Arrays.asList(tp);
kafkaConsumer.assign(tps);
long offset = kafkaConsumer.position(tp);
System.out.println("Offset of partition: " + offset);
List<String> messages = new ArrayList<String>();
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Message received: " + record.value());
messages.add(record.value());
}
consumer.commitAsync();
System.out.println("Offset commited.\n");
consumer.close();
无论我运行多少次,它总是显示偏移量为0。因此,它总是从一开始就接收所有消息。我错过了什么?
编辑:根据 Matthias 的回答,我决定手动提交偏移量。但是commitSync() 会挂起。 commitAsync() 之类的作品。稍后我将解释“某种”。以下是代码的作用:
producer send 2 messages;
consumer initiates;
print out current position;
consumer.poll();
print received messages;
consumer.commitAsync();
这就是这段代码的行为方式。假设我有 100 条消息。现在生产者发送 2 条新消息。在消费者轮询之前,它会显示当前偏移位置为 102,应该是 100。因此,不会打印出新消息。这几乎就像在生产者发送消息后更新偏移量一样。
【问题讨论】:
-
“auto.commit.interval”到“auto.commit.interval.ms”? -> kafka.apache.org/documentation/#newconsumerconfigs
-
@QuentinGeff 改成
auto.commit.interval.ms还是一样。 -
您是否尝试手动控制消费者偏移量?在这里解释:kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/…
-
@QuentinGeff 我尝试了
commitSync()和commitAsync(),但仍然无法正常工作。我在收到每条消息后打印出偏移量,它的增量很好。似乎每次运行后偏移量都会以某种方式重置。
标签: java apache-kafka kafka-consumer-api