【发布时间】:2022-01-26 22:25:56
【问题描述】:
我有一个从 Kafka 主题轮询记录的消费者,我正在执行以下操作:
- 将消费者分配到 kafka 主题中的特定分区。
- 寻找过去的特定偏移量(因此肯定有要轮询的记录)。
- 我正在执行以下代码:
while (true)
{
ConsumerRecords<GenericRecord, GenericRecord> items=
consumer.poll(Duration.ofMillis(300));
log.info("Polled {} items", items.count());
}
我得到以下日志:
Polled 0 items
Polled 0 items
Polled 0 items
Polled 0 items
.
.
.
Polled 0 items
Polled 3620 items
我只是想了解民意调查的行为,以及为什么它在多次尝试后得到 0 条消息,然后在稍后的时间点得到记录? (请记住,我寻求的是过去的偏移量)。
这是我的消费者配置:
{
schema.registry.url=https://schema-registry.*********,
enable.auto.commit=false,
max.poll.records=65536,
group.id=**************,
fetch.max.wait.ms=5000,
bootstrap.servers=********
fetch.min.bytes=1048576,
fetch.max.bytes=1048576,
auto.offset.reset=earliest
}
【问题讨论】:
-
请也添加您的消费者配置。
-
已经做了,谢谢提示!
标签: java apache-kafka kafka-consumer-api