【问题标题】:Handling connection errors in Spring reactive webclient处理 Spring 反应式 Web 客户端中的连接错误
【发布时间】:2021-04-26 16:36:01
【问题描述】:

我有一个 spring webclient 对外部服务进行 http 调用,并由响应式断路器工厂 (resilience4J impl) 提供支持。当客户端建立连接但响应失败(任何内部服务器或 4XX 错误)时,WebClient 和断路器按预期运行。但是,如果客户端无法建立连接,无论是 Connection Refused 还是 UnknownHost,它都会开始崩溃。

  1. 我似乎无法在 Web 客户端中捕获错误消息并触发断路器。
  2. 断路器永远不会打开并引发 TimeoutException。
    java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 1000ms in 'circuitBreaker' (and no fallback has been configured)

来自 Web 客户端的错误。
io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:9000

这是我的代码。我也粘贴了错误来源。我试图将 ConnectException 映射到我的自定义异常以供断路器拾取,但是它不起作用。没有远程服务器的响应,有人可以帮我处理错误吗?

 public Mono<String> toSink(
  Envelope envelope, ConsumerConfiguration webClientConfiguration) {

return getWebClient()
    .post()
    .uri(
        uriBuilder -> {
          if (webClientConfiguration.getPort() != null) {
            uriBuilder.port(webClientConfiguration.getPort());
          }
          return uriBuilder.path(webClientConfiguration.getServiceURL()).build();
        })
    .headers(
        httpHeaders ->
            webClientConfiguration.getHttpHeaders().forEach((k, v) -> httpHeaders.add(k, v)))
    .bodyValue(envelope.toString())
    .retrieve()
    .bodyToMono(Map.class)
    // Convert 5XX internal server error and throw CB exception
    .onErrorResume(
        throwable -> {
          log.error("Inside the error resume callback of webclient {}", throwable.toString());
          if (throwable instanceof WebClientResponseException) {
            WebClientResponseException r = (WebClientResponseException) throwable;
            if (r.getStatusCode().is5xxServerError()) {
              return Mono.error(new CircuitBreakerOpenException());
            }
          }
          return Mono.error(new CircuitBreakerOpenException());
        })
    .map(
        map -> {
          log.info("Response map:{}", Any.wrap(map).toString());
          return Status.SUCCESS.name();
        })
    .transform(
        it -> {
          ReactiveCircuitBreaker rcb =
              reactiveCircuitBreakerFactory.create(
                  webClientConfiguration.getCircuitBreakerId());
          return rcb.run(
              it,
              throwable -> {
                /// "Did not observe any item or terminal signal within 1000ms.. " <--- Error here
                log.info("throwable in CB {}", throwable.toString());
                if (throwable instanceof CygnusBusinessException) {
                  return Mono.error(throwable);
                }
                return Mono.error(
                    new CircuitBreakerOpenException(
                        throwable, new CygnusContext(), null, null, null));
              });
        })
    ///io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:9000  <-- Error prints here    
    .onErrorContinue((throwable, o) -> log.error(throwable.toString()))
    .doOnError(throwable -> log.error("error from webclient:{}", throwable.toString()));

}

【问题讨论】:

    标签: spring-webflux project-reactor spring-webclient reactor-netty


    【解决方案1】:

    我通过添加一个 onErrorContinue 块并将异常重新抛出为自定义来修复它,该异常在我的断路器代码中得到处理。

    .onErrorContinue(
            (throwable, o) -> {
              log.info("throwable => {}", throwable.toString());
              if (throwable instanceof ReadTimeoutException || throwable instanceof ConnectException) {
                throw new CircuitBreakerOpenException();
              }
            })
    

    【讨论】:

      【解决方案2】:

      我将对您的解决方案提出以下建议:

      1- onErrorContinue 的另一种变体接受谓词,因此您可以定义此运算符将应用于哪些异常 - Docs

      2- 返回 Mono.error 而不是从 Mono/Flux 运算符中抛出 RuntimeExceptions。另一个 stackoverflow 答案很好地涵盖了这一点 - Stackoverflow

      3- 使用副作用运算符执行日志记录 (doOn*)

      .doOnError(throwable -> log.info("throwable => {}", throwable.toString()))
      .onErrorResume(throwable -> throwable instanceof ReadTimeoutException || throwable instanceof ConnectException,
                             t -> Mono.error(new CircuitBreakerOpenException()))
      

      希望这有帮助。

      【讨论】:

      • 我应用了这个解决方案,虽然它有效,但我对断路器有一些副作用。我仍然收到以下 TimeoutException java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 1000ms in 'circuitBreaker' (and no fallback has been configured)。此外,在发出请求之前引发的任何异常(例如 POST 的不正确正文)都不会被捕获。
      猜你喜欢
      • 2018-04-22
      • 1970-01-01
      • 1970-01-01
      • 2012-03-30
      • 1970-01-01
      • 2022-10-14
      • 1970-01-01
      • 2018-03-21
      • 1970-01-01
      相关资源
      最近更新 更多