【问题标题】:Kafka consumer seek operation is not returning data卡夫卡消费者寻找操作没有返回数据
【发布时间】:2022-08-17 11:33:33
【问题描述】:

我有一个要求,我必须在外部监视一个消费者组,并检查消费者记录中的特定偏移量,该偏移量已经被上述消费者组消费了。我创建了一个AdminClient 来连接到集群并执行该操作。

现在,当我尝试对特定偏移量执行assign()seek() 操作然后轮询数据时,它总是返回一个空映射。

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10));

下面是我的代码。我登录到控制中心,可以看到以下主题分区和偏移量的数据。请帮助我确定问题。

Properties properties = new Properties();
properties.put(\"bootstrap.servers\", \"server_list\");
properties.put(\"security.protocol\", \"SASL_SSL\");
properties.put(\"ssl.truststore.location\", \".jks file path\");
properties.put(\"ssl.truststore.password\", \"****\");
properties.put(\"sasl.mechanism\", \"****\");
properties.put(\"sasl.kerberos.service.name\", \"****\");
properties.put(\"group.id\", grp_id);
properties.put(\"auto.offset.reset\", \"earliest\");
// properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,grp_id);  
//properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,\"earliest\");  
properties.put(\"key.deserializer\", \"org.apache.kafka.common.serialization.StringDeserializer\");
properties.put(\"value.deserializer\", \"org.apache.kafka.common.serialization.StringDeserializer\");
properties.put(\"auto.offset.reset\", \"earliest\");
properties.put(\"enable.auto.commit\", \"false\");

KafkaConsumer < String, String > consumer = new KafkaConsumer < String, String > (properties);

try {

    TopicPartition partition0 = new TopicPartition(\"topic1\", 1);

    consumer.assign(Arrays.asList(partition0));
    long offset = 19 L;
    consumer.seek(tp, offset);
    boolean messageend = true;

    try {
        while (messageend) {
            ConsumerRecords < String, String > records = consumer.poll(Duration.ofMillis(10));
            if (null != records && !records.isEmpty()) {
                for (ConsumerRecord < String, String > record: records) {
                    if (record.offset() == offset) {
                        System.out.println(\"Match found\");
                        messageend = false;
                    }
                }
            } else {
                messageend = false;
            }
        }
    }
}
} catch (Exception e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}
  • 它从哪个偏移量轮询?
  • 它应该从给定的上述偏移量(偏移量 = 19L)进行轮询

标签: java apache-kafka kafka-consumer-api


【解决方案1】:

不清楚“外部监控”是什么意思,但如果该组中已经有活跃的消费者,则不应使用相同的消费者组名称从主题中读取。

换句话说,您的消费者将加入该组并可能导致现有消费者的重新平衡,或者如果分配的分区已在组中的其他位置分配,它将作为空闲加入并且不消耗任何内容。这似乎是你遇到的情况。

您应该能够在 CLI 上更轻松地执行此操作

kcat -C -b kafka:9092 -t topic1 -p 1 -o 19 -m 1

【讨论】:

  • 我不能使用 cli ,我必须以编程方式获取它
  • 好吧,这只是一个例子。你读了我剩下的答案吗?
【解决方案2】:
  1. 主题 1 有多少个分区?您需要至少有 2 个才能找到 TopicPartition("topic 1", 1)。

  2. 当 consumer.poll(Duration.ofMillis(10)); 时,超时 10 毫秒对我来说相当短。

  3. 您在同一组中有其他消费者吗?如果同组的consumer个数大于partition的个数,那么会有空闲的consumer,所以没有赋值,所以seek和poll都会失败

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-09-18
    • 2019-06-23
    • 1970-01-01
    • 2020-03-14
    • 2019-05-20
    • 2020-07-24
    相关资源
    最近更新 更多