【发布时间】:2021-10-26 22:23:03
【问题描述】:
考虑以下代码:
@Slf4j
@ExtendWith(MockitoExtension.class)
class ConnectionEventsConsumerTest {
@Test
public void testOnErrorResume() {
Flux.range(0, 5)
.doOnNext(event -> log.info("Processing - {}", event))
.flatMap(event -> processEvent(event)
.doOnSuccess(result -> log.info("Processed - {}", event))
.onErrorResume(t -> handleError(t, event))
)
.doOnError(t -> log.error("Exception propagated", t))
//.log()
.then()
.subscribe();
}
private Mono<Void> processEvent(Object object) {
return Mono.error(() -> new RuntimeException("test"));
//throw new RuntimeException("test");
}
private Mono<Void> handleError(Throwable throwable, Object object) {
log.error("Processing Failed - {}", object);
return Mono.empty();
}
}
方法 processEvent 返回 Mono.error 与抛出 Exception 时的输出完全不同。
代码原样(返回 Mono.error),我看到了我的预期,处理和处理的 300 次迭代失败,我看到没有传播异常。
17:33:19.853 [主要] 信息 com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- 处理 - 0 17:33:19.864 [main] 错误 com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- 处理失败 - 0 17:33:19.865 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- 处理 - 1 17:33:19.866 [main] 错误 com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- 处理失败 - 1 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- 处理 - 2 17:33:19.866 [main] 错误 com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- 处理失败 - 2 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- 处理 - 3 17:33:19.866 [main] 错误 com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- 处理失败 - 3 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- 处理 - 4 17:33:19.866 [main] 错误 com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- 处理失败 - 4
另一方面,如果我取消注释 throw,我会看到正在处理 Flux 中的单个项目,我没有看到来自 handleError 的消息并且我看到“异常传播”
17:35:53.950 [主要] 信息 com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- 处理 - 0 17:35:53.968 [main] 错误 com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- 异常传播 java.lang.RuntimeException: 测试
如果这是设计使然,那么 flatMap 的最佳做法是什么?想到的简单解决方案是用 try-catch 包围 flatMap 的内容,以将异常包装在 Mono.error 中。虽然它有效,但它不优雅且过于手动,很可能被遗忘。
【问题讨论】:
-
这能回答你的问题吗? Correct way of throwing exceptions with Reactor我觉得解释的很好
-
我在发布该问题之前确实阅读了这篇文章,当时我正在调查。不幸的是,我认为它没有多大帮助。我对那篇文章的解释基本上可以概括为:不要使用异常,使用 Mono.error。这已经是我们正在做的事情,但根据定义,异常可以从任何地方抛出,说不使用异常是不现实的。
-
知道了。我去看看
标签: java spring-webflux project-reactor