【问题标题】:combining inbound channel adapters and stream emitter结合入站通道适配器和流发射器
【发布时间】:2018-03-16 12:24:10
【问题描述】:

我正在使用 Spring Cloud Stream Reactive 并且遇到了问题。

考虑以下代码:

@InboundChannelAdapter("list", poller = [(Poller(fixedDelay = "\${thetis.listInterval:60000}"))])
fun timerMessageSource(): Flux<Center> = config.centers.toFlux()

我的目标是生成一种通量,该通量应被以下形式的某种东西消耗:

 @StreamListener("list") @Output("download")
 fun processList(center: Center): Flux<Product> = ...

但这似乎不起作用。通道适配器正确生成通量,但它显示以下内容:

 org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Unrecognized token 'FluxIterable': was expecting ('true', 'false' or 'null')

我想我应该在入站通道适配器上添加一个StreamEmitter 注释,但这似乎不起作用。

实现这种流程的正确方法是什么?

谢谢你和问候,

费尔南多

【问题讨论】:

    标签: kotlin spring-integration spring-cloud-stream reactive


    【解决方案1】:

    例外情况非常明显:您生成了一个Flux 对象作为消息的payload 发送到list 通道,以发送到消息中间件上的目标目的地。并且它完全纠正了Flux 与要序列化的 JSON 不兼容。

    另一方面,我不确定 Kotlin 是什么并将您的代码编译为 Java,但我们有类似这样的开箱即用的东西(用于 Java):

    @StreamEmitter
    @Output("list")
    public Flux<Center> timerMessageSource() {
         return config.centers.toFlux();
    }
    

    flux 中每个发出的项目都将被序列化并作为记录或消息发送到 Binder。当然,如果您的 Center 与 JSON 兼容。为此,您需要一个 spring-cloud-stream-reactive 依赖项。

    是的,@InboundChannelAdapter 在这里没有帮助甚至打扰。

    如果您担心一些周期性的发射,如果应该考虑在Project Reactor 中提供调度支持。

    【讨论】:

    • 谢谢,现在可以使用了。我还在研究这些东西,还有很多我不完全理解的东西:)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-06-29
    • 2016-01-18
    • 1970-01-01
    • 2023-03-12
    • 1970-01-01
    相关资源
    最近更新 更多