【问题标题】:Spring Cloud Stream Reactive Listener Without Output没有输出的 Spring Cloud Stream 反应式侦听器
【发布时间】:2019-05-30 13:26:59
【问题描述】:

我正在使用 Reactive Spring Cloud Stream,但在创建没有输出的 StreamListener 时遇到了问题。只要没有收到格式错误的消息,以下代码就可以工作。当收到格式错误的消息时,通量关闭。

@StreamListener
public void handleMessage(@Input(MessagingConfig.INPUT) Flux<String> payloads) {
    payloads.flatMap(objectToSave -> reactiveMongoTemplate.insert(objectToSave)).subscribe();
}

如果我理解正确,最好让框架订阅 Flux,而不是手动订阅它。当侦听器有输出时,这不是问题,因为我可以像这样简单地返回通量:

@StreamListener
@Output(MessagingConfig.OUTPUT)
public Flux<String> handleMessage(@Input(MessagingConfig.INPUT) Flux<String> payloads) {
    return payloads.flatMap(objectToSave -> reactiveMongoTemplate.insert(objectToSave));
}

该框架似乎以一种在返回时不会关闭通量的方式处理错误消息。当侦听器未指定输出时,有什么方法可以让框架处理通量?

【问题讨论】:

    标签: java reactive-programming spring-webflux spring-cloud-stream project-reactor


    【解决方案1】:

    考虑切换到使用我们拥有recently adopted 的 Spring Cloud Function (SCF) 编程模型。 基本上,只要你有最新的代码库(2.1.0.RC4 是最新的,RELEASE 是几天之后)你就可以了。以下是使用 SCF 编程模型的代码示例:

    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class SampleReactiveConsumer {
    
        public static void main(String[] args) {
            SpringApplication.run(SampleReactiveConsumer.class, 
                       "--spring.cloud.stream.function.definition=consume");
        }
    
        @Bean
        public Consumer<Flux<String>> consume(){
            return payloads -> payloads.flatMap(objectToSave -> reactiveMongoTemplate.insert(objectToSave)).subscribe();
        }
    }
    

    您还可以从类路径中删除响应式模块,因为我们也在考虑一起弃用它

    【讨论】:

    • 如果我想使用自己的接口来声明输入/输出通道而不是使用提供的 Source/Sink/Processor 接口怎么办?如何将自定义界面中的输入/输出映射到函数?是否有“spring.cloud.stream.function.definition”的替代语法可以指定频道?
    • 目前函数模型只支持标准的 Source、Processor 和 Sink,这是有原因的——主要是为了推广微服务风格的架构,这与使用 binder 构建通用消息传递应用程序不同。换句话说,每个微服务都有一个消息监听器,上面提到的三个接口充分涵盖了这一点。
    【解决方案2】:

    如果您真的想避免 Oleg 的回答中提到的 SCF,您可以尝试以下 hacky 方法。

    const val IN = "input"
    const val OUT = "dummy-output"
    
    interface Channels {
        @Input(IN)
        fun input(): MessageChannel
    
        @Output(OUT)
        fun output(): MessageChannel
    }
    
    @EnableBinding(Channels::class)
    class MsgList {
        @StreamListener
        @Output(OUT)
        fun receive(@Input(IN) messages: Flux<String>): Flux<Void> {
            return messages
                .doOnNext { if (it == "err") throw IllegalStateException("err") }
                .doOnNext { println(it) }
                .flatMap { Mono.empty<Void>() }
        }
    }
    

    将创建输出绑定,但不会通过任何消息。在 RabbitMQ 的情况下,这意味着 - 会出现虚拟交换,但不会创建队列。

    还会按您的预期处理错误。在上面的例子中,你可以发送 3 条消息,“ok”,“err”,“ok2”,你会在屏幕上看到“ok”,然后是异常,然后是“ok2”。将正确处理“ok2”和任何后续有效消息。

    【讨论】:

    • 你说得对,这对我来说确实很老套。我最终使用了非反应式 StreamListener。不幸的是,Spring Can Stream Reactive 已被弃用,因为许多“微服务”没有 Spring Cloud Function 所假设的一个输入/输出
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-01-31
    • 1970-01-01
    • 1970-01-01
    • 2021-06-27
    • 2018-02-16
    • 2018-01-31
    • 2018-06-23
    相关资源
    最近更新 更多