【问题标题】:How to use "Kafka Streams Binder" with "Functional Style" and DI?如何将“Kafka Streams Binder”与“Functional Style”和DI一起使用?
【发布时间】:2019-12-25 10:02:14
【问题描述】:

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.M3/reference/html/spring-cloud-stream-binder-kafka.html#_programming_model 显示了一个示例,其中可以使用属性spring.cloud.stream.bindings.process_in.destination 设置输入主题

现在我想使用依赖注入,例如

@Bean
public java.util.function.Consumer<KStream<Object, String>> process(JavaMailSender mailSender) {...}

启动应用程序时(基于 Spring Boot),属性 spring.cloud.stream.bindings.process_in.destination 被忽略,而是订阅输入主题 input

编辑:这是 Kotlin 代码(没有导入)

Mailer.kt:

@Configuration
class Mailer {
    @Bean
    fun sendMail(/*mailSender: JavaMailSender*/) = Consumer<KStream<Any, Mail>> { input ->
        input.foreach { _, mail -> println("mail = $mail") }
    }
}

Mail.kt:

data class Mail(var from: String = "", var to: String = "", var subject: String = "", var body: String = "")

Application.kt:

@SpringBootApplication
class Application

fun main(args: Array<String>) {
    runApplication<Application>(*args) {
    }
}

application.yml::

spring.cloud.stream:
  bindings.sendMail_in.destination: mail
  kafka.binder.configuration.listeners: PLAINTEXT://localhost:9092

【问题讨论】:

  • 您好,您可以分享更多代码或配置吗?我当然想看看你所描述的。 kafka 流绑定器中的新功能特性目前处于里程碑阶段。我们希望确定您尝试自动接线方式的任何问题。
  • @sobychacko 请参阅上面的附加 Kotlin 代码。
  • 只有在取消注释依赖注入时才注意到这个问题吗?

标签: spring-cloud spring-kafka spring-cloud-stream


【解决方案1】:

活页夹中有一些问题没有正确autowire 提供给函数/消费者bean 的bean。最新的快照解决了这些问题。请确保您使用的是最新的快照 (3.0.0.BUILD-SNAPSHOT)。这是一个sample application,它适用于您提供的相同场景。

【讨论】:

  • 谢谢你,@sobychacko。使用快照工作正常。顺便说一句,我也尝试了新的 Bean DSL,它也可以正常工作。
猜你喜欢
  • 2020-06-03
  • 2017-07-03
  • 2019-10-24
  • 2023-04-02
  • 2017-06-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-12-12
相关资源
最近更新 更多