【发布时间】:2020-03-30 16:10:44
【问题描述】:
kafka 版本:0.9.0.1
如果n = 20,
我必须得到一个主题的最后 20 条消息。
我试过了
kafkaConsumer.seekToBeginning();
但它会检索所有消息。我只需要获取最后 20 条消息。
这个话题可能有几十万条记录
public List<JSONObject> consumeMessages(String kafkaTopicName) {
KafkaConsumer<String, String> kafkaConsumer = null;
boolean flag = true;
List<JSONObject> messagesFromKafka = new ArrayList<>();
int recordCount = 0;
int i = 0;
int maxMessagesToReturn = 20;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "project.group.id");
props.put("max.partition.fetch.bytes", "1048576000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList(kafkaTopicName));
TopicPartition topicPartition = new TopicPartition(kafkaTopicName, 0);
LOGGER.info("Subscribed to topic " + kafkaConsumer.listTopics());
while (flag) {
// will consume all the messages and store in records
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
kafkaConsumer.seekToBeginning(topicPartition);
// getting total records count
recordCount = records.count();
LOGGER.info("recordCount " + recordCount);
for (ConsumerRecord<String, String> record : records) {
if(record.value() != null) {
if (i >= recordCount - maxMessagesToReturn) {
// adding last 20 messages to messagesFromKafka
LOGGER.info("kafkaMessage "+record.value());
messagesFromKafka.add(new JSONObject(record.value()));
}
i++;
}
}
if (recordCount > 0) {
flag = false;
}
}
kafkaConsumer.close();
return messagesFromKafka;
}
【问题讨论】:
-
注意:十万读者不会理解“十万”这个词。 (平均而言,你不应该期望超过几百人真正阅读你的问题......在接下来的几年里)
-
另外:你见过stackoverflow.com/questions/52625995/… ???我会假设:当
kafka-simple-consumer-shell.sh可以给你第一个、下一个、最后一个……主题时……那么也应该有一个 API。 -
我正在尝试那些 API,我没有找到任何 @GhostCat。
-
会尽量简化,您只想使用最近的 20 条消息?还是每次投票最多只能获得 20 条记录?
-
最近 20 个 @Deadpool
标签: java apache-kafka kafka-consumer-api kafka-records