【问题标题】:Spring Web Flux - WebClient - exchange - block - how to release the connection to the connection pool?Spring Web Flux - WebClient - exchange - block - 如何释放连接到连接池?
【发布时间】:2020-12-25 07:05:53
【问题描述】:

当前:

我正在使用spring-webflux-5.2.8.RELEASE,这是工作“很好”:

httpStatus = webClient
    .post()
    .uri(someUri)
    .headers(someHeaders)
    .bodyValue(someBody)
    .exchange()
    .map(ClientResponse::statusCode)
    .timeout(someTimeout)
    .doOnError(doSomething())
    .onErrorResume(ex -> Mono.empty())
    .block();

问题:

当返回错误时,没有问题,因为连接被破坏并且没有放回连接池:

调试 r.n.resources.PooledConnectionProvider - [id: 0xa23f78ad, L:/127.0.0.1:7524 ! R:localhost/127.0.0.1:8443] 频道已关闭,现在为 0 活动连接和 0 个非活动连接

但是当我得到一个成功的响应时,下一个 post 将失败/超时:

java.util.concurrent.TimeoutException:没有观察到任何项目或 'map' 中 10000 毫秒内的终端信号(并且没有回退 配置)

由于我需要进行故障排除,我使用了一个只有 1 个连接的修复连接池,如下所示:

@Bean
public WebClient.Builder webClientBuilder(){
    
    HttpClient httpClient = HttpClient.create(ConnectionProvider.create("pool", 1));
    
    return WebClient.builder()
        .clientConnector(new ReactorClientHttpConnector(httpClient));
}

我猜(我可能是错的,因为我对 webclient 和响应式世界完全陌生)问题是它在成功获得响应后 不释放 连接,我认为它是由于this

当通过 WebClient exchange() 方法使用 ClientResponse 时, 你必须确保身体被消耗或释放......

已经尝试过的:

我尝试像这样执行 releaseBody()(但这不起作用,因为下一个 post 仍然失败):

.map(clientResponse -> { 
    HttpStatus statusCode = clientResponse.statusCode();
    clientResponse.releaseBody();
    return statusCode;
})

P.S 我需要使用 block()。这无法更改,因为我需要响应才能继续前进,我只需要获取状态代码。我正在使用 WebClient,因为我在某处读到了将弃用其余模板以支持 ... WebClient。我希望有人可以提供帮助。

更新 1:

我启用了指标,但确实没有释放连接:

reactor_netty_connection_provider_fixedPool_total_connections{id="1591603494",remote_address="localhost:8443",} 1.0 reactor_netty_connection_provider_fixedPool_active_connections{id="1591603494",remote_address="localhost:8443",} 1.0

更新 2

找到这个:https://github.com/spring-projects/spring-framework/issues/20474!

更新 3

我尝试使用默认连接数(即 500),我注意到在每次 发布 后活动连接数不断增加 :-(

[reactor-http-nio-2] 调试 r.n.resources.PooledConnectionProvider - [id: 0x2316e048, L:/127.0.0.1:32787 - R:localhost/127.0.0.1:8443] 通道已连接,现在有 7 个活动连接和 0 个非活动连接

【问题讨论】:

  • 如果您的应用程序是一个完全反应式的应用程序,您在任何情况下都不应阻止。这可能会给您带来极差的性能。 need to use the block(). That can't be changed as i need the response to move on, and i just need to get the status code. 这句话毫无意义。你不应该阻止,你应该对响应进行平面映射
  • 感谢托马斯的意见。我现在无法将其更改为 flatMap 并且我的应用程序不是响应式的,只是使用 WebClient “is”的那部分。最坏的情况是更改为 RestTemplate,但我想我刚刚找到了解决方案。无论如何,感谢您的回复。干杯伙伴。
  • 很好,你澄清了它。如果非反应性,则允许阻塞。我只是想排除某些事情。
  • 你可以试试clientResponse.bodyToMono(Void.class);而不是releaseBody
  • releaseBody 更好,因为如果数据通过,它不会关闭连接,根据 rstoyanchev here。我的错误是我没有等到releaseBody 完成后才返回。

标签: spring-webflux reactor-netty


【解决方案1】:

这对我有用:

HttpStatus httpStatus = null;
Mono<HttpStatus> monoHttpStatus  = null;
:           
WebClient webClient = xxx.getWebClient();
:       
try {
    monoHttpStatus  = webClient
        .post()
        .uri(someUri)
        .headers(someHeaders)
        .bodyValue(someBody)
        .exchange()
        .map(clientResponse -> 
            clientResponse.releaseBody().thenReturn(clientResponse.statusCode()))
        .timeout(someTimeout)
        .doOnError(Exception.class, e -> logger.error("An exception has occurred: ", e))
        .onErrorResume(ex -> Mono.empty())
        .block();
                
        if(monoHttpStatus != null)
            httpStatus = monoHttpStatus.block();            
}
:

//SomeOtherClass
@Autowired
private WebClient.Builder webClientBuilder;
public WebClient getWebClient() {
        
    :
    
    webClient= webClientBuilder.baseUrl(baseUrl)
                   .build();
    :   
    return webClient;
}

我现在在 post 完成后看到这个:

调试 reactor.util.Loggers$Slf4JLogger.debug[249] [reactor-http-nio-1] [id: 0x6b0568a3, L:/127.0.0.1:10168 - R:localhost/127.0.0.1:8443] 发布频道
调试 reactor.util.Loggers$Slf4JLogger.debug[254] [reactor-http-nio-1] [id: 0x6b0568a3, L:/127.0.0.1:10168 - R:localhost/127.0.0.1:8443] 通道已清理,现在有 0 个活动连接 和 1 个非活动连接

【讨论】:

  • 是的,现在可以了:)
  • 嗨@VioletaGeorgieva,我有几个“疑问”,它太长了不适合cmets,因此我更新了我的问题。你有时间可以看看吗?提前致谢。
  • 一般情况下,如果可以的话,我建议您远离exchange。请改用retrieve。例如,在您的用例中,您可以使用 .retrieve() + .toBodilessEntity()
  • 好的,当我读到retrieve() 无权访问statusCode() 时,我会检查一下。在我的其他微服务中,我需要使用 exchange(),因为我需要读取状态码和响应,所以我这样做了.flatMap(response -&gt; response.toEntity(typeReference)),根据this doc 很好,因为我正在使用toEntity .现在,你的回复吓到我了,希望我这个新手不会有其他的“惊喜”。
  • toBodilessEntity 完全符合您的需要 - 释放传入数据并让您访问标题、状态代码等。对于您的其他用例,您可以执行 retrieve + toEntity
猜你喜欢
  • 2012-04-22
  • 2020-08-06
  • 2014-08-06
  • 2013-05-01
  • 2016-10-24
  • 1970-01-01
  • 2015-12-04
  • 2015-07-29
  • 1970-01-01
相关资源
最近更新 更多