【问题标题】:Implement Reactive Kafka Listener in Spring Boot application在 Spring Boot 应用程序中实现 Reactive Kafka Listener
【发布时间】:2023-03-24 01:43:01
【问题描述】:

我正在尝试在我的 Spring 启动应用程序中实现反应式 kafka 消费者,我正在查看这些示例: https://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main/java/reactor/kafka/samples/SampleScenarios.java

而且在响应式 kafka 中似乎还没有对 Spring 的支持

我了解 kafka 监听器如何在 Spring 中的非反应式 kafka API 中工作:最简单的解决方案是为 ConcurrentKafkaListenerContainerFactory 和 ConsumerFactory 配置 bean,然后使用 @KafkaListener 注解,瞧

但我现在不确定如何在 Spring 中正确使用响应式 kafka。

基本上我需要一个主题的听众。我应该创建自己的某种循环或调度程序吗? 或者,也许我错过了一些东西。谁能分享他们的知识和最佳实践?

【问题讨论】:

  • 您是否在 Spring 中为 Apache Kafka 项目看到了 ReactiveKafkaConsumerTemplategithub.com/spring-projects/spring-kafka/blob/master/…
  • @KafkaListener 的响应式支持已列入明年的路线图。现在,我们所拥有的只是 Artem 提到的轻量级包装器。也就是说,为响应式(或任何异步)消费者管理分区偏移特别困难。
  • @ArtemBilan 感谢您的链接,将对此进行调查

标签: java kafka-consumer-api spring-kafka reactive-kafka reactor-kafka


【解决方案1】:

我还没有现成的解决方案,但我正在尝试这个(Kotlin 代码,Spring Boot)。有人在这里发布了部分代码sn-p https://github.com/reactor/reactor-kafka/issues/100

@EventListener(ApplicationStartedEvent::class)
fun onSomeEvent() {
    kafkaReceiver
        .receive()
        .doOnNext { record ->
            val myEvent = record.value()
            processMyEvent(myEvent).thenEmpty {
                record.receiverOffset().acknowledge()
            }
        }
        .doOnError {
            /* todo */
        }
        .subscribe()
}

查看其他堆栈溢出问题。那里没有太多,但也许会给你一些想法

【讨论】:

  • 你能分享到 reactor kafka 示例或教程的链接吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-01-19
  • 1970-01-01
  • 2020-04-24
  • 2022-10-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多