【问题标题】:Can't get kafka consumer records with AUTO_OFFSET_RESET_CONFIG = "latest"无法使用 AUTO_OFFSET_RESET_CONFIG = "latest" 获取 kafka 消费者记录
【发布时间】:2022-01-20 10:34:13
【问题描述】:

我有一个测试自动化项目。我正在尝试从配置ConsumerConfig.AUTO_OFFSET_RESET_CONFIG = "latest" 的最新记录开始获取kafka 消费者记录。但它不起作用。这是我尝试轮询数据的代码:

for(int i=0; i<20; i++) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(500L));
            value = findValue(key, consumerRecords);
            if(value != null){
                break;
            }
        }

在此代码中,变量 consumerRecords 在每次迭代中的大小为 0。

如果我将ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 更改为"earliest",则consumer.poll() 有效并且变量consumerRecords 的大小不是0,但是集合中的元素从最早的偏移量开始,而我需要从上次偏移量开始的元素。

如何通过偏移量按降序排列元素来实现consumerRecords

我尝试将超时时间增加到最多 10 秒 - 它没有帮助。

kafka-clients:2.7.0

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api


    【解决方案1】:

    对于消费者AUTO_OFFSET_RESET_CONFIG = "latest",只有在消费者启动后有记录生成到主题时,消费者才会获得任何记录。您正在获取AUTO_OFFSET_RESET_CONFIG = "earliest" 的记录,因为该主题中已经存在记录。您可以获取更多信息here

    【讨论】:

      【解决方案2】:

      我需要从上次偏移开始的元素。

      然后你会想要在消费者从主题的末尾开始之后运行一些生产者,那里还没有数据。但是,在自动化测试期间,从主题末尾开始只有 20 次轮询可能会产生竞争条件,在这种情况下,消费者可以在生产者实际发送任何内容之前完成。

      元素按偏移量降序排列?

      向后迭代Kafka主题是不可行的

      【讨论】:

      • 好吧,如果你真的想向后迭代一个主题,你可以使用 Consumer.seek 方法和批量大小配置。不过我不建议这样做,也许在测试中会很好。
      • 这真的很慢,因为您需要一次消耗并寻找一个记录偏移量
      猜你喜欢
      • 2019-11-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多