【发布时间】:2022-08-17 11:33:33
【问题描述】:
我有一个要求,我必须在外部监视一个消费者组,并检查消费者记录中的特定偏移量,该偏移量已经被上述消费者组消费了。我创建了一个AdminClient 来连接到集群并执行该操作。
现在,当我尝试对特定偏移量执行assign() 和seek() 操作然后轮询数据时,它总是返回一个空映射。
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10));
下面是我的代码。我登录到控制中心,可以看到以下主题分区和偏移量的数据。请帮助我确定问题。
Properties properties = new Properties();
properties.put(\"bootstrap.servers\", \"server_list\");
properties.put(\"security.protocol\", \"SASL_SSL\");
properties.put(\"ssl.truststore.location\", \".jks file path\");
properties.put(\"ssl.truststore.password\", \"****\");
properties.put(\"sasl.mechanism\", \"****\");
properties.put(\"sasl.kerberos.service.name\", \"****\");
properties.put(\"group.id\", grp_id);
properties.put(\"auto.offset.reset\", \"earliest\");
// properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,grp_id);
//properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,\"earliest\");
properties.put(\"key.deserializer\", \"org.apache.kafka.common.serialization.StringDeserializer\");
properties.put(\"value.deserializer\", \"org.apache.kafka.common.serialization.StringDeserializer\");
properties.put(\"auto.offset.reset\", \"earliest\");
properties.put(\"enable.auto.commit\", \"false\");
KafkaConsumer < String, String > consumer = new KafkaConsumer < String, String > (properties);
try {
TopicPartition partition0 = new TopicPartition(\"topic1\", 1);
consumer.assign(Arrays.asList(partition0));
long offset = 19 L;
consumer.seek(tp, offset);
boolean messageend = true;
try {
while (messageend) {
ConsumerRecords < String, String > records = consumer.poll(Duration.ofMillis(10));
if (null != records && !records.isEmpty()) {
for (ConsumerRecord < String, String > record: records) {
if (record.offset() == offset) {
System.out.println(\"Match found\");
messageend = false;
}
}
} else {
messageend = false;
}
}
}
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
-
它从哪个偏移量轮询?
-
它应该从给定的上述偏移量(偏移量 = 19L)进行轮询
标签: java apache-kafka kafka-consumer-api