【问题标题】:Exception thrown in a flatMap are ignored by onErrorResume operatoronErrorResume 运算符忽略 flatMap 中引发的异常
【发布时间】:2021-10-26 22:23:03
【问题描述】:

考虑以下代码:

@Slf4j
@ExtendWith(MockitoExtension.class)
class ConnectionEventsConsumerTest {

    @Test
    public void testOnErrorResume() {
        Flux.range(0, 5)
                .doOnNext(event -> log.info("Processing -  {}", event))
                .flatMap(event -> processEvent(event)
                        .doOnSuccess(result -> log.info("Processed - {}", event))
                        .onErrorResume(t -> handleError(t, event))
                )
                .doOnError(t -> log.error("Exception propagated", t))
                //.log()
                .then()
                .subscribe();
    }

    private Mono<Void> processEvent(Object object) {
        return Mono.error(() -> new RuntimeException("test"));
        //throw new RuntimeException("test");
    }
    
    private Mono<Void> handleError(Throwable throwable, Object object) {
        log.error("Processing Failed - {}", object);
        
        return Mono.empty();
    }
    
}

方法 processEvent 返回 Mono.error 与抛出 Exception 时的输出完全不同。

代码原样(返回 Mono.error),我看到了我的预期,处理和处理的 300 次迭代失败,我看到没有传播异常。

17:33:19.853 [主要] 信息 com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest

  • 处理 - 0 17:33:19.864 [main] 错误 com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • 处理失败 - 0 17:33:19.865 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • 处理 - 1 17:33:19.866 [main] 错误 com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • 处理失败 - 1 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • 处理 - 2 17:33:19.866 [main] 错误 com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • 处理失败 - 2 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • 处理 - 3 17:33:19.866 [main] 错误 com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • 处理失败 - 3 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • 处理 - 4 17:33:19.866 [main] 错误 com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • 处理失败 - 4

另一方面,如果我取消注释 throw,我会看到正在处理 Flux 中的单个项目,我没有看到来自 handleError 的消息并且我看到“异常传播”

17:35:53.950 [主要] 信息 com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest

  • 处理 - 0 17:35:53.968 [main] 错误 com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • 异常传播 java.lang.RuntimeException: 测试

如果这是设计使然,那么 flatMap 的最佳做法是什么?想到的简单解决方案是用 try-catch 包围 flatMap 的内容,以将异常包装在 Mono.error 中。虽然它有效,但它不优雅且过于手动,很可能被遗忘。

【问题讨论】:

  • 这能回答你的问题吗? Correct way of throwing exceptions with Reactor我觉得解释的很好
  • 我在发布该问题之前确实阅读了这篇文章,当时我正在调查。不幸的是,我认为它没有多大帮助。我对那篇文章的解释基本上可以概括为:不要使用异常,使用 Mono.error。这已经是我们正在做的事情,但根据定义,异常可以从任何地方抛出,说不使用异常是不现实的。
  • 知道了。我去看看

标签: java spring-webflux project-reactor


【解决方案1】:

创建/返回Mono 的方法不应以这种方式抛出异常。由于异常是在Mono被组装(创建)之前抛出的,所以flatMap中的后续操作符不可能生效,因为它们需要Mono来操作。

如果您无法控制 processEvent() 方法来修复其行为,那么您可以用 Mono.defer 包装它,这将确保即使在组装期间引发的错误也将通过 Mono 内部传播flatMap:

Flux.range(0, 5)
    .doOnNext(event -> log.info("Processing -  {}", event))
    .flatMap(event -> Mono.defer(() -> processEvent(event))
                .doOnSuccess(result -> log.info("Processed - {}", event))
                .onErrorResume(t -> handleError(t, event)))
    .doOnError(t -> log.error("Exception propagated", t))


private Mono<Void> processEvent(Object object) {
    throw new RuntimeException("test");
}

请注意,在 mapdoOnNext 等其他中间运算符中,您可以随意以丑陋的方式抛出异常,因为 Reactor 可以将它们转换为正确的错误信号,因为此时 Mono 已经在进行中。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-02-25
    • 2022-06-25
    • 2010-11-20
    • 1970-01-01
    • 2023-04-06
    • 2018-09-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多