【发布时间】:2019-03-24 02:14:47
【问题描述】:
我目前开发了一个代码,可以显示主题、分区和日志偏移量。但我目前坚持如何获得分区的滞后。我知道有一个 kafka offset 命令可以执行此功能,但我需要的是一个 java 代码。
public static void main(String[] args) throws Exception {
System.out.println("START CONSUMER");final Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUPID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// Create the consumer using props.
final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
// Subscribe to the topic.
int i = 0;
ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
for (i=0;i<consumer.partitionsFor(TOPIC).size();i++)
{
TopicPartition partitiontemp = new TopicPartition(TOPIC, i);
partitions.add(partitiontemp);
}
consumer.assign(partitions);
consumer.seekToEnd(partitions);
for (i=0;i<consumer.partitionsFor(TOPIC).size();i++)
{
System.out.printf("Topic: %s partitionID: %d log offset: %d \n", TOPIC, i, consumer.position(partitions.get(i)));
}
System.out.printf("CREATE CONSUMER DONE");
consumer.close();
我需要做的是输出主题、分区、当前偏移量、日志偏移量和滞后。如何获得我的代码的滞后或如何获得我的代码的当前偏移量。 (有关所需输出,请参见图片)。
注意:我不能使用(foreach 记录)功能,因为我不能读取输入文件中的每条记录。
【问题讨论】:
-
为什么需要的输出标签不同?
-
Kafka offset命令也运行Java(其实是Scala),源码在Github上,如果你只是想复现的话
标签: java apache-kafka kafka-consumer-api