【问题标题】:Make Kafka Consumer wait for Events让 Kafka Consumer 等待事件
【发布时间】:2018-08-30 03:17:07
【问题描述】:

我有一个用例,其中消费者需要收听主题 T1 上的某些事件(消费主题 T1 中的消息并搜索某些情况)。只有当它检测到某些事件时,才开始从主题 T2 消费并处理消息从 T2 检索。我们正在使用 spring kafka 2.1.7。我现在看不到这样做的方法 因为消费者会在 kafkalistener bean 出现时从 T2 开始阅读。我正在寻找有关如何实现这一点的建议。

【问题讨论】:

    标签: spring-kafka


    【解决方案1】:

    您的每个@KafkaListener 应依赖于略有不同的ConcurrentKafkaListenerContainerFactory 实例,其中一个主题T2 应使用setAutoStartup(false) 进行配置。

    因此,当满足 T1@KafkaListener 中的条件时,您可以为第二个 @KafkaLisnter 调用 KafkaListenerEndpointRegistry.getListenerContainer(containerId).start()containerId 来自 @KafkaLisnter.id()

    【讨论】:

    • 在下一个版本 (2.2) 中,您将能够直接在 @KafkaListener 注释上覆盖 autoStartup
    • 谢谢大家。在监听器上覆盖自动启动将非常有帮助。同时附加要求之一是主题 T2 和主题 T1 的偏移提交应该在同一个事务中。所以我在这里的理解是,在 2.2 发布之前我不能让容器管理事务,而是使用相同的事务管理器@方法级别,然后对 T1 和 T2 使用 template.sendOffsetsToTransaction 。这是正确的方法吗?
    • 我不这么认为。第二个@KafkaListener 将在自己的线程中工作。我认为你不能发送偏移量来提交到另一个线程。
    猜你喜欢
    • 1970-01-01
    • 2022-08-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-04-24
    • 2023-03-02
    • 2019-06-19
    • 1970-01-01
    相关资源
    最近更新 更多