【发布时间】:2019-05-30 13:26:59
【问题描述】:
我正在使用 Reactive Spring Cloud Stream,但在创建没有输出的 StreamListener 时遇到了问题。只要没有收到格式错误的消息,以下代码就可以工作。当收到格式错误的消息时,通量关闭。
@StreamListener
public void handleMessage(@Input(MessagingConfig.INPUT) Flux<String> payloads) {
payloads.flatMap(objectToSave -> reactiveMongoTemplate.insert(objectToSave)).subscribe();
}
如果我理解正确,最好让框架订阅 Flux,而不是手动订阅它。当侦听器有输出时,这不是问题,因为我可以像这样简单地返回通量:
@StreamListener
@Output(MessagingConfig.OUTPUT)
public Flux<String> handleMessage(@Input(MessagingConfig.INPUT) Flux<String> payloads) {
return payloads.flatMap(objectToSave -> reactiveMongoTemplate.insert(objectToSave));
}
该框架似乎以一种在返回时不会关闭通量的方式处理错误消息。当侦听器未指定输出时,有什么方法可以让框架处理通量?
【问题讨论】:
标签: java reactive-programming spring-webflux spring-cloud-stream project-reactor