【问题标题】:Kafka - understand when all consumers performed seek to LATESTKafka - 了解所有消费者何时执行寻求最新
【发布时间】:2021-07-22 15:58:36
【问题描述】:

我在一个消费者组中有多个消费者(单个 Spring Boot 应用程序)。我使用onPartitionsAssigned 回调将每个消费者偏移量重置为最新(对于分配的分区)。

问题是我需要知道消费者组中的所有消费者何时执行此搜索并在此之后执行一些逻辑。

我目前的理解是每个消费者都独立执行搜索和开始处理 - 即没有任何共同的同步点。

如果有人能提供指导是否可行,那就太好了。

更新。让我解释一下为什么需要它。 我有一个逻辑来发送 HTTP 请求(到另一个服务)以请求将一些数据提交给 Kafka。但在发送此请求之前,我需要确保所有消费者都已经处于最新的偏移量。因为如果此请求较早发送 - 此第二个服务提交的某些数据可能会丢失 - 即,如果它是在消费者完成重置为 LATEST 偏移量之前提交的。

【问题讨论】:

  • 您是否只需要知道您的消费者最晚的时间?然后进行手动操作?或者你想在编程上做这件事?
  • 正确。是的 - 在编程中。

标签: apache-kafka spring-kafka


【解决方案1】:

没有内置任何东西;您可以将 CountDownLatch 设置为容器的并发,并为每个容器倒计时。

如果并发可能发生变化,您可以从KafkaListenerEndpointRegistry bean 获取对并发容器的引用。

int count = ((ConcurrentMessageListenerContainer<?, ?>) registry.getListenerContainer(id))
    .getConcurrency();

【讨论】:

  • 什么时候可以插入这个 CountDownLatch?查看 spring-kafka 代码,我看不到任何允许指定回调类型以了解消费者何时成功寻求指定偏移量的地方(在我的情况下是最新的)。
  • 正如我所说,没有任何内置功能。您需要将闩锁放在某个实用程序类中,并在您的侦听器中调用await(....),在onPartitionsAssigned() 中调用countDown()(每个线程只调用一次)。
  • callback.seekToEnd不同步的问题 - 重置offset的请求是异步发送的。所以我们不能在这个方法之后立即调用countDown
  • 你为什么这么说?回调立即对消费者执行搜索。如果您看到不同的内容,请编辑问题以显示您的代码。
  • 不,不是——它只是设置 FetchState、AWAIT_RESET 和 resetStrategy。稍后执行获取最新偏移量的实际请求。这不是我的代码 - 这是 spring-kafka/kafka 代码。
猜你喜欢
  • 2017-08-17
  • 1970-01-01
  • 2019-11-06
  • 1970-01-01
  • 1970-01-01
  • 2016-10-31
  • 2017-11-16
  • 2017-12-21
  • 2018-03-01
相关资源
最近更新 更多