【问题标题】:spring kafka, get topic size even when emptyspring kafka,即使为空也能获取主题大小
【发布时间】:2021-09-29 21:07:43
【问题描述】:

当我们成功读取某个主题的所有条目后,我想解锁CountDownLatch。我目前的方法与 Spring Kafka 类似:

private final CountDownLatch latch = new CountDownLatch(1);

@KafkaListener(
  topicPartitions = @TopicPartition(topic = KAFKA_TOPIC, partitions = {"0"}),
  containerFactory = "factory")
public void consume(
    List<Payload> payload,
    Consumer<?, ?> consumer,
    @Header(KafkaHeaders.OFFSET) Long offset) {
        // handle all entries:
        // payload.forEach(eventHandler::handle);

        // offset of last committed message + 1
        int endOffset = consumer.endOffsets(List.of(TOPIC_PART)).get(TOPIC_PART);

        // when we have processed the last entry, we unlock the latch.
        if (offset + payload.size() >= endOffset) {
            latch.countDown();
        }
}

问题是当Kafka主题为空时latch永远不会被解锁,因为该方法永远不会运行。主题为空时如何解锁?

【问题讨论】:

  • 这种情况下offsetendOffsetpayload.size()的值是多少?
  • 由于主题为空,因此consume()方法从未运行过。
  • 您应该使用 GetOffsetShell / AdminClient 列出主题偏移量,而不是初始化消费者。此外,您的主题大小永远不会改变吗? Kafka 主题没有任何“结束”,并且随着保留开始删除记录而不断增加

标签: java apache-kafka spring-kafka


【解决方案1】:

https://docs.spring.io/spring-kafka/docs/2.6.10/reference/html/#idle-containershttps://docs.spring.io/spring-kafka/docs/2.6.10/reference/html/#event-consumption

以下 Spring 应用程序事件由侦听器容器及其使用者发布:

...

ListenerContainerIdleEvent:在idleInterval(如果已配置)中未收到任何消息时发布。

...

ListenerContainerIdleEvent 具有以下属性:

source:发布事件的监听器容器实例。

container:侦听器容器或父侦听器容器,如果源容器是子容器。

id:侦听器 ID(或容器 bean 名称)。

idleTime:发布事件时容器空闲的时间。

topicPartitions:容器在事件生成时分配的主题和分区。

consumer:对 Kafka Consumer 对象的引用。例如,如果之前调用了消费者的 pause() 方法,则在收到事件时可以 resume()。

paused:容器当前是否处于暂停状态。有关详细信息,请参阅暂停和恢复侦听器容器。

public class Listener {

    @KafkaListener(id = "qux", topics = "annotated")
    public void listen4(@Payload String foo, Acknowledgment ack) {
        ...
    }

    @EventListener(condition = "event.listenerId.startsWith('qux-')")
    public void eventHandler(ListenerContainerIdleEvent event) {
        ...
    }

}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-03-12
    • 2020-12-14
    • 2022-07-29
    • 1970-01-01
    • 2019-06-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多