【问题标题】:Spring Cloud Stream error handling using RabbitMQ and Reactor使用 RabbitMQ 和 Reactor 处理 Spring Cloud Stream 错误
【发布时间】:2018-03-16 11:49:33
【问题描述】:

我试图了解在将 Spring Cloud Stream 与 RabbitMQ 和 Reactor 一起使用时处理错误的正确方法。当接收到的消息得到正确验证时,一切都运行良好,并且链能够处理消息。当发生错误时,链会中断并终止。

说明正在发生的事情(稍微简化):

@StreamListener
@Output(Processor.OUTPUT)
public Flux<byte[]> receive(@Input(Processor.INPUT) Flux<Message> input) {
    input.map(this::transformDataToJSON)
         .onErrorResume(MessageValidationException.class, this::processValidationException)
         .map(m -> m.getBytes())
}

发生了一些简单的转换,可能会导致抛出验证异常。 onErrorResume 将处理异常并且流程继续。但是在处理异常时,链终止,因此它不会接收到新消息。

由于这不是很可靠,我正在寻找这种情况下的最佳做法。处理这种情况的正确方法是什么?验证和跳过会是更好的处理方式,还是有另一种方式来正确处理此类流异常?

【问题讨论】:

    标签: spring rabbitmq spring-cloud-stream reactor


    【解决方案1】:

    我得出的结论是,这种方法不正确。如其他文章所述,onError(及其所有变体)适用于特殊情况。

    我现在采取的方法是在数据发送到链下之前验证数据,如果数据没有验证它正在由不同的订阅者处理。

    @StreamListener
    @Output(Processor.OUTPUT)
    public Flux<byte[]> receive(@Input(Processor.INPUT) Flux<Message> input) {
        input.filter(b -> !validate(b)).doOnNext(this::handleInvalidData).subscribe();    
        input.filter(this::validate)
         .map(this::transformDataToJSON)
         .map(m -> m.getBytes())
    }
    

    这没有优化,因此欢迎改进。

    【讨论】:

      猜你喜欢
      • 2017-11-24
      • 1970-01-01
      • 2019-10-03
      • 2018-01-24
      • 2018-06-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-11-24
      相关资源
      最近更新 更多