【发布时间】:2018-08-30 03:17:07
【问题描述】:
我有一个用例,其中消费者需要收听主题 T1 上的某些事件(消费主题 T1 中的消息并搜索某些情况)。只有当它检测到某些事件时,才开始从主题 T2 消费并处理消息从 T2 检索。我们正在使用 spring kafka 2.1.7。我现在看不到这样做的方法 因为消费者会在 kafkalistener bean 出现时从 T2 开始阅读。我正在寻找有关如何实现这一点的建议。
【问题讨论】:
标签: spring-kafka
我有一个用例,其中消费者需要收听主题 T1 上的某些事件(消费主题 T1 中的消息并搜索某些情况)。只有当它检测到某些事件时,才开始从主题 T2 消费并处理消息从 T2 检索。我们正在使用 spring kafka 2.1.7。我现在看不到这样做的方法 因为消费者会在 kafkalistener bean 出现时从 T2 开始阅读。我正在寻找有关如何实现这一点的建议。
【问题讨论】:
标签: spring-kafka
您的每个@KafkaListener 应依赖于略有不同的ConcurrentKafkaListenerContainerFactory 实例,其中一个主题T2 应使用setAutoStartup(false) 进行配置。
因此,当满足 T1 的 @KafkaListener 中的条件时,您可以为第二个 @KafkaLisnter 调用 KafkaListenerEndpointRegistry.getListenerContainer(containerId).start()。 containerId 来自 @KafkaLisnter.id()。
【讨论】:
@KafkaListener 注释上覆盖 autoStartup。
@KafkaListener 将在自己的线程中工作。我认为你不能发送偏移量来提交到另一个线程。