【问题标题】:Retrieve last n messages of Kafka consumer from a particular topic从特定主题中检索 Kafka 消费者的最后 n 条消息
【发布时间】:2020-03-30 16:10:44
【问题描述】:

kafka 版本:0.9.0.1

如果n = 20, 我必须得到一个主题的最后 20 条消息。

我试过了

kafkaConsumer.seekToBeginning();

但它会检索所有消息。我只需要获取最后 20 条消息。

这个话题可能有几十万条记录

public List<JSONObject> consumeMessages(String kafkaTopicName) {
  KafkaConsumer<String, String> kafkaConsumer = null;
  boolean flag = true;
  List<JSONObject> messagesFromKafka = new ArrayList<>();
  int recordCount = 0;
  int i = 0;
  int maxMessagesToReturn = 20;

  Properties props = new Properties();         
  props.put("bootstrap.servers", "localhost:9092");
  props.put("group.id", "project.group.id");
  props.put("max.partition.fetch.bytes", "1048576000");
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  kafkaConsumer = new KafkaConsumer<>(props);

  kafkaConsumer.subscribe(Arrays.asList(kafkaTopicName));
  TopicPartition topicPartition = new TopicPartition(kafkaTopicName, 0);
  LOGGER.info("Subscribed to topic " + kafkaConsumer.listTopics());
  while (flag) {
    // will consume all the messages and store in records
    ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
    kafkaConsumer.seekToBeginning(topicPartition);

    // getting total records count
    recordCount = records.count();
    LOGGER.info("recordCount " + recordCount);
    for (ConsumerRecord<String, String> record : records) {
      if(record.value() != null) {
        if (i >= recordCount - maxMessagesToReturn) {
          // adding last 20 messages to messagesFromKafka
          LOGGER.info("kafkaMessage "+record.value());
          messagesFromKafka.add(new JSONObject(record.value()));
        }
        i++;
      }
    }
    if (recordCount > 0) {
      flag = false;
    }
  }
  kafkaConsumer.close();
  return messagesFromKafka;
}

【问题讨论】:

  • 注意:十万读者不会理解“十万”这个词。 (平均而言,你不应该期望超过几百人真正阅读你的问题......在接下来的几年里)
  • 另外:你见过stackoverflow.com/questions/52625995/… ???我会假设:当kafka-simple-consumer-shell.sh 可以给你第一个、下一个、最后一个……主题时……那么也应该有一个 API。
  • 我正在尝试那些 API,我没有找到任何 @GhostCat。
  • 会尽量简化,您只想使用最近的 20 条消息?还是每次投票最多只能获得 20 条记录?
  • 最近 20 个 @Deadpool

标签: java apache-kafka kafka-consumer-api kafka-records


【解决方案1】:

您可以使用kafkaConsumer.seekToEnd(Collection&lt;TopicPartition&gt; partitions) 查找给定分区的最后一个偏移量。根据文档:

"查找每个给定分区的最后一个偏移量。此函数进行惰性计算,仅在调用poll(Duration)position(TopicPartition) 时才在所有分区中查找最终偏移量。如果没有提供分区,则查找所有当前分配的分区的最终偏移量。"

然后您可以使用position(TopicPartition partition) 检索特定分区的位置。

然后你可以从中减少 20,并使用kafkaConsumer.seek(TopicPartition partition, long offset) 来获取最近的 20 条消息。

很简单,

kafkaConsumer.seekToEnd(partitionList);
long endPosition = kafkaConsumer.position(topicPartiton);
long recentMessagesStartPosition = endPosition - maxMessagesToReturn;
kafkaConsumer.seek(topicPartition, recentMessagesStartPosition);

现在您可以使用 poll() 检索最近的 20 条消息

这是一个简单的逻辑,但如果你有多个分区,你也必须考虑这些情况。我没有尝试过,但希望你能明白这个概念。

【讨论】:

  • 我快二十岁了。二十岁中。假设我的记录数是 100000。我得到的记录是 61,421 - 61,441。但我想要 99,980 - 1,00,000。我增加了“max.partition.fetch.bytes”的值,然后它从 82,525 到 82,845。无论'max.partition.fetch.bytes'值如何,它都应该工作
  • @praveenkumar 尝试使 autoCommit 为真
  • 另外,这也不是完美的解决方案。首先,endPosition 不会是最后一条记录的偏移量,而是下一条新记录的位置。第二个问题是逻辑最多返回 20 条记录。偏移量存在差距的原因有很多。但最大的问题是 poll():调用 poll() 时应该使用多长时间?一秒可能给你零记录,一分钟意味着你一直在等待一分钟。
猜你喜欢
  • 1970-01-01
  • 2020-03-19
  • 2018-07-27
  • 1970-01-01
  • 2016-12-23
  • 2017-10-19
  • 2019-06-08
  • 2020-02-08
  • 2018-07-01
相关资源
最近更新 更多