【发布时间】:2022-01-20 10:34:13
【问题描述】:
我有一个测试自动化项目。我正在尝试从配置ConsumerConfig.AUTO_OFFSET_RESET_CONFIG = "latest" 的最新记录开始获取kafka 消费者记录。但它不起作用。这是我尝试轮询数据的代码:
for(int i=0; i<20; i++) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(500L));
value = findValue(key, consumerRecords);
if(value != null){
break;
}
}
在此代码中,变量 consumerRecords 在每次迭代中的大小为 0。
如果我将ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 更改为"earliest",则consumer.poll() 有效并且变量consumerRecords 的大小不是0,但是集合中的元素从最早的偏移量开始,而我需要从上次偏移量开始的元素。
如何通过偏移量按降序排列元素来实现consumerRecords?
我尝试将超时时间增加到最多 10 秒 - 它没有帮助。
kafka-clients:2.7.0
【问题讨论】:
标签: java apache-kafka kafka-consumer-api