【问题标题】:Closing Reactor Netty connection on error status codes在错误状态代码上关闭 Reactor Netty 连接
【发布时间】:2020-07-05 02:41:50
【问题描述】:

我正在通过 Spring Webflux 框架使用 Reactor Netty,以便将数据发送到远程内容交付网络。当客户端请求完成时,Reactor Netty 的默认行为是保持连接处于活动状态并将其释放回底层连接池。

一些内容交付网络建议对某些类型的状态代码(例如 500 内部服务器错误)重新解析 DNS。为此,我添加了自定义NettyDnsNameResolverDnsCache,但我也需要关闭连接,否则会被释放回池中,DNS不会被重新解析。

如何关闭错误状态代码的连接?

到目前为止,我通过在 Reactor Netty 的 TcpClient 中添加 ConnectionObserver 提出了以下解决方法:

TcpClient tcpClient = TcpClient.create()
        .observe((connection, newState) -> {
            if (newState == State.RELEASED && connection instanceof HttpClientResponse) {
                HttpResponseStatus status = ((HttpClientResponse) connection).status();
                if (status.codeClass() != HttpStatusClass.SUCCESS) {
                    connection.dispose();
                }
            }
        });

即,如果连接已被释放(即放回连接池)并且释放是由带有不成功状态码的 HTTP 客户端响应引起的,则关闭连接。

这种方法感觉很笨拙。如果连接在错误状态码后被释放,并且观察者正在关闭该连接,那么新请求是否可以并行获取相同的连接?框架是否在内部优雅地处理事情,或者这是使上述方法无效的竞争条件?

提前感谢您的帮助!

【问题讨论】:

    标签: java netty reactor-netty


    【解决方案1】:

    最好使用doOnResponsedoAfterResponseSuccess,这取决于用例哪个更合适。

    但是等待 RELEASED 应该不是问题

    如果连接在错误状态码后被释放,并且观察者正在关闭该连接,那么新请求是否可以并行获取相同的连接?框架是否在内部优雅地处理事情,或者这是使上述方法无效的竞争条件?

    连接池默认采用先进先出租赁策略运行,所以如果池中有空闲连接,你将无法获得相同的连接,如果将连接池切换为后进先出策略则不会出现这种情况。获取时会检查每一个连接是否处于活动状态,只提供一个活动连接供使用。

    更新

    您也可以尝试以下仅使用 WebClient API 而不是 Reactor Netty API 的方法:

    return this.webClient
               .get()
               .uri("/500")
               .retrieve()
               .onStatus(status -> status.equals(HttpStatus.INTERNAL_SERVER_ERROR), clientResponse -> {
                    clientResponse.bodyToFlux(DataBuffer.class)
                                  .subscribe(new BaseSubscriber<DataBuffer>() {
                                      @Override
                                      protected void hookOnSubscribe(Subscription subscription) {
                                          subscription.cancel();
                                      }
                                  });
                    return Mono.error(new IllegalStateException("..."));
               })
               .bodyToMono(String.class);
    

    【讨论】:

    • 感谢您的建议和见解! doAfterResponseSuccess 实际上是我的第一次尝试,但是在响应处理流程中关闭连接似乎发生得太早并导致问题。例如,通过 Spring 的 ResponseSpec 获得的响应式对象可能不再发出 onNext 信号,WebClient 请求就会挂起。
    • 就连接池而言,我仍然担心在重负载下我们可能会遇到竞争条件,即新请求获取 5xx 后释放的连接,验证它仍然存在,开始使用它(第一个问题:DNS 没有根据内容交付网络的要求重新解析),同时观察者异步关闭它(第二个问题:新请求会发生什么?错误?挂起?未定义的行为?)。由于代码对观察到的事件做出异步反应,因此这种竞争条件很难测试。
    • 谢谢,我已经尝试了您的新方法,它似乎确实按预期工作,并且可能解决了我之前描述的不太可能的竞争条件!然而,它的一大缺点是丢失了错误响应的主体(这些可以提供通过WebClientResponseException 实例轻松访问的有价值的信息),并且在它依赖于未记录的内部框架行为的意义上确实感觉有些脆弱(因此可能会在未来的版本中停止工作)。
    • 可以打开WebClient的增强请求
    • 在这里完成:github.com/spring-projects/spring-framework/issues/24846 我已将此答案标记为已接受,直到发现更好的答案。 :)
    猜你喜欢
    • 2013-12-15
    • 2019-03-15
    • 1970-01-01
    • 1970-01-01
    • 2021-03-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多