【发布时间】:2021-09-29 21:07:43
【问题描述】:
当我们成功读取某个主题的所有条目后,我想解锁CountDownLatch。我目前的方法与 Spring Kafka 类似:
private final CountDownLatch latch = new CountDownLatch(1);
@KafkaListener(
topicPartitions = @TopicPartition(topic = KAFKA_TOPIC, partitions = {"0"}),
containerFactory = "factory")
public void consume(
List<Payload> payload,
Consumer<?, ?> consumer,
@Header(KafkaHeaders.OFFSET) Long offset) {
// handle all entries:
// payload.forEach(eventHandler::handle);
// offset of last committed message + 1
int endOffset = consumer.endOffsets(List.of(TOPIC_PART)).get(TOPIC_PART);
// when we have processed the last entry, we unlock the latch.
if (offset + payload.size() >= endOffset) {
latch.countDown();
}
}
问题是当Kafka主题为空时latch永远不会被解锁,因为该方法永远不会运行。主题为空时如何解锁?
【问题讨论】:
-
这种情况下
offset、endOffset和payload.size()的值是多少? -
由于主题为空,因此consume()方法从未运行过。
-
您应该使用 GetOffsetShell / AdminClient 列出主题偏移量,而不是初始化消费者。此外,您的主题大小永远不会改变吗? Kafka 主题没有任何“结束”,并且随着保留开始删除记录而不断增加
标签: java apache-kafka spring-kafka