【问题标题】:Complete WebClient asynchronous example with Spring WebFlux使用 Spring WebFlux 完成 WebClient 异步示例
【发布时间】:2021-06-09 17:28:01
【问题描述】:

我是 Reactive 编程范式的新手,但最近我决定在 Spring WebClient 上构建一个简单的 Http 客户端,因为旧的同步 RestTemplate 已经在维护中,可能会在即将发布的版本中被弃用。

所以我首先查看了 Spring 文档,然后我在网上搜索了示例。

我必须说(暂时)我有意识地决定不阅读 Reactor lib 文档,所以除了发布者-订阅者模式之外,我对 Mono 和 Flux 的了解很少.相反,我专注于让某些事情发挥作用。

我的场景是一个简单的 POST,用于向服务器发送回调,客户端只对响应状态代码感兴趣。没有返回任何身体。所以我终于想出了这个有效的代码sn-p:

private void notifyJobSuccess(final InternalJobData jobData) {
        
        SuccessResult result = new SuccessResult();
        result.setJobId(jobData.getJobId());
        result.setStatus(Status.SUCCESS);
        result.setInstanceId(jobData.getInstanceId());
        
        log.info("Result to send back:" + System.lineSeparator() + "{}", result.toString());
        
        this.webClient.post()
            .uri(jobData.getCallbackUrl())
            .body(Mono.just(result), ReplaySuccessResult.class)
            .retrieve()
            .onStatus(s -> s.equals(HttpStatus.OK), resp -> {   
                log.info("Expected CCDM response received with HttpStatus = {}", HttpStatus.OK);
                return Mono.empty();
            })
            .onStatus(HttpStatus::is4xxClientError, resp -> {   
                log.error("CCDM response received with unexpected Client Error HttpStatus {}. "
                        + "The POST request sent by EDA2 stub did not match CCDM OpenApi spec", resp.statusCode());
                return Mono.empty();
            })
            .onStatus(HttpStatus::is5xxServerError, resp -> {   
                log.error("CCDM response received with unexpected Server Error HttpStatus {}", resp.statusCode());
                return Mono.empty();
            }).bodyToMono(Void.class).subscribe(Eda2StubHttpClient::handleResponseFromCcdm);
        
    }

我对响应式 WebClient 工作原理的理解很差,从调用订阅开始。我在编写客户端代码之前检查的数十个示例中没有一个包含这样的调用,但事实是,在我包含该调用之前,服务器一直在等待请求。

然后我碰到了口头禅“在您订阅之前什么都不会发生”。知道 Plublisher-Subscriber 模式我知道,但我(错误地)假设订阅是由 WebClient API 在任何交换或 bodyToMono 方法中处理的...... block() 绝对必须订阅,因为当你阻止它时,请求立即发出。

所以我的第一个问题是:真的需要这个 subscribe() 调用吗?

第二个问题是为什么 StubHttpClient::handleResponse 方法永远不会被回调。为此,我发现的唯一解释是,由于返回的 Mono 是 Mono<Void>,因为响应中除了状态代码之外没有任何内容,因为它从未实例化,所以该方法完全是虚拟的......我什至可以仅用 .subscribe() 替换它。这是一个正确的假设吗?

最后,要求提供一个在 Mono 中接收主体的方法的完整示例,然后再使用它是不是太过分了?我发现的所有示例都只关注发出请求,但后来如何使用 Mono 或 Flux 现在超出了我的理解......我知道我最终必须尽快检查 Reactor 文档,但我会很感激有点帮助,因为我在处理异常和错误时遇到问题。

谢谢!

【问题讨论】:

  • I must say that I have consciously decided not to go through the Reactor lib documentation 然后我建议你这样做how much research effort is expected of stack overflow users
  • @ToerkTumlare,我承认在尝试使用 Spring Reactive WebClient 之前我还没有深入研究过 Reactor 文档,我很确定,就像数百人一样。但我可以肯定地说,在尝试自己寻找解决方案/解释之前,我从不发布任何问题。事实上,您正在将我重定向到一个与我的问题非常相似的问题,但没有承认尚未调查的内容。无论如何,如果你不这么认为,那对我来说很好。归根结底,我不太在乎声誉。
  • @Toerktumlare,您建议的帖子仅解决了我的第一个问题,是的,没错。有了这个,我还想发一篇文章,有人可以帮助提供一个使用 WebClient 的完整示例,而不仅仅是遍布网络的数千行代码只是为了从应用程序中获取请求,这很容易部分。
  • 最后,老实说,如果您认为这个问题没有进行太多调查,我无法想象您对您所指的那个问题的看法,询问邮递员的浏览器如何处理 Mono 和Flux 调用 subscribe()... 在将其发回给我之前,您是否已阅读了两遍?拜托,如果您在这方面有经验,我认为您提供一个好的和完整的例子会更有帮助,不仅对我,而且对社区。谢谢!

标签: spring-webflux spring-webclient


【解决方案1】:

自从我在这里寻求帮助已经过去了一段时间。现在我不想编辑,而是为我之前的问题添加一个答案,以便答案保持清晰并且与原始问题和 cmets 分开。 所以这里有一个完整的例子。

上下文: 一个应用程序,作为客户端,从 OAuth2 授权服务器请求访问令牌。 Access Token 是异步请求的,以避免在另一端处理令牌请求并响应到达时阻塞应用程序的线程。

首先,这是一个向其客户端提供访问令牌的类(方法getAccessToken):如果访问令牌已经初始化并且有效,则返回存储的值;否则获取一个调用内部方法fetchAccessTokenAsync的新方法:

public class Oauth2ClientBroker {
private static final String OAUHT2_SRVR_TOKEN_PATH= "/auth/realms/oam/protocol/openid-connect/token";
private static final String GRANT_TYPE = "client_credentials";

@Qualifier("oAuth2Client")
private final WebClient oAuth2Client;

private final ConfigurationHolder CfgHolder;

@GuardedBy("this")
private String token = null;

@GuardedBy("this")
private Instant tokenExpireTime;

@GuardedBy("this")
private String tokenUrlEndPoint;

public void getAccessToken(final CompletableFuture<String> completableFuture) {

    if (!isTokenInitialized() || isTokenExpired()) {
        log.trace("Access Token not initialized or has exired: go fetch a new one...");
        synchronized (this) {
            this.token = null;
        }
        fetchAccessTokenAsync(completableFuture);
    } else {
        log.trace("Reusing Access Token (not expired)");
        final String token;
        synchronized (this) {
            token = this.token;
        }
        completableFuture.complete(token);
    }
}

... }

接下来,我们将看到fetchAccessTokenAsync 会:

private void fetchAccessTokenAsync(final CompletableFuture<String> tokenReceivedInFuture) {

    Mono<String> accessTokenResponse = postAccessTokenRequest();
    accessTokenResponse.subscribe(tr -> processResponseBodyInFuture(tr, tokenReceivedInFuture));

}

这里发生了两件事:

  1. 方法postAccessTokenRequest() 使用exchangeToMono 构建一个POST 请求并声明如何使用响应(当WebFlux 在收到响应后使其可用时):
private Mono postAccessTokenRequest() {

        log.trace("Request Access Token for OAuth2 client {}", cfgHolder.getClientId());

        final URI uri = URI.create(cfgHolder.getsecServiceHostAndPort().concat(OAUHT2_SRVR_TOKEN_PATH));
            } else {
                uri = URI.create(tokenUrlEndPoint);
            }

        }
        log.debug("Access Token endpoint OAuth2 Authorization server: {}", uri.toString());

        return oAuth2Client.post().uri(uri)
                .body(BodyInserters.fromFormData("client_id", cfgHolder.getEdaClientId())
                        .with("client_secret", cfgHolder.getClientSecret())
                        .with("scope", cfgHolder.getClientScopes()).with("grant_type", GRANT_TYPE))
                .exchangeToMono(resp -> {
                    if (resp.statusCode().equals(HttpStatus.OK)) {
                        log.info("Access Token successfully obtained");
                        return resp.bodyToMono(String.class);
                    } else if (resp.statusCode().equals(HttpStatus.BAD_REQUEST)) {
                        log.error("Bad request sent to Authorization Server!");
                        return resp.bodyToMono(String.class);
                    } else if (resp.statusCode().equals(HttpStatus.UNAUTHORIZED)) {
                        log.error("OAuth2 Credentials exchange with Authorization Server failed!");
                        return resp.bodyToMono(String.class);
                    } else if (resp.statusCode().is5xxServerError()) {
                        log.error("Authorization Server could not generate a token due to a server error");
                        return resp.bodyToMono(String.class);
                    } else {
                        log.error("Authorization Server returned an unexpected status code: {}",
                                resp.statusCode().toString());
                        return Mono.error(new Exception(
                                String.format("Authorization Server returned an unexpected status code: %s",
                                        resp.statusCode().toString())));
                    }
                }).onErrorResume(e -> {
                    log.error(
                            "Access Token could not be obtained. Process ends here");
                    return Mono.empty();
                });
    }

exchangeToMono 方法在这里完成了大部分魔法:告诉 WebFlux 返回一个 Mono,一旦收到响应就会异步接收信号,包裹在 ClientResponse 中,参数 resp 在拉姆达。但重要的是要记住,此时尚未发出任何请求;我们只是传入一个函数,该函数将在它到达时采用ClientResponse,并将返回一个带有我们感兴趣的主体部分的Mono&lt;String&gt;(访问令牌,我们将看到)。

  1. 一旦构建了 POST 并返回了 Mono,那么当我们订阅之前返回的 Mono&lt;String&gt; 时,真正的事情就开始了。正如 Reacive 的口头禅所说:nothing happens until you subscribe 或者,在我们的例子中,the request is not actually sent until something attempts to read or wait for the response。 WebClient fluent API 中还有其他方式可以隐式订阅,但我们在这里选择了显式方式来返回 Mono - 它实现了反应器 Publisher 接口 - 并订阅它。在这里,我们不再阻塞线程,释放 CPU 用于其他事情,可能比等待答案更有用。

到目前为止,一切都很好:我们已经发出了请求,释放了 CPU,但是当响应到来时,哪里会继续处理呢? subscribe() 方法将消费者作为参数作为参数,在我们的例子中使用字符串参数化,不亚于我们正在等待的响应的主体,包装在 Mono 中。当响应到来时,WebFlux 会将事件通知给我们的 Mono,Mono 会调用 processResponseBodyInFuture 方法,最终我们会收到响应体:

private void processResponseBodyInFuture(final String body, final CompletableFuture<String> tokenReceivedInFuture) {

    DocumentContext jsonContext = JsonPath.parse(body);

    try {
        log.info("Access Token response received: {}", body);
        final String aTkn = jsonContext.read("$.access_token");
        log.trace("Access Token parsed: {}", aTkn);
        final int expiresIn = jsonContext.read("$.expires_in");
        synchronized (this) {
            this.token = aTkn;
            this.tokenExpireTime = Instant.now().plusSeconds(expiresIn);
        }
        log.trace("Signal Access Token request completion. Processing will continue calling client...");
        tokenReceivedInFuture.complete(aTkn);
    } catch (PathNotFoundException e) {
        try {
            log.error(e.getMessage());
            log.info(String.format(
                    "Could not extract Access Token. The response returned corresponds to the error %s: %s",
                    jsonContext.read("$.error"), jsonContext.read("$.error_description")));
        } catch (PathNotFoundException e2) {
            log.error(e2.getMessage().concat(" - Unexpected json content received from OAuth2 Server"));
        }
    }

}

只要 Mono 收到有关接收响应的信号,就会调用此方法。因此,在这里我们尝试使用访问令牌解析 json 内容并对其进行处理...在这种情况下,将complete() 调用到由初始方法getAccessToken 的调用者传入的CompletableFuture 上,希望会知道怎么办。我们的工作在这里完成......异步!

总结: 总而言之,当您使用响应式 WebClient 时,这些是发送请求和处理响应的基本注意事项:

  1. 考虑有一个方法负责通过 WebClient fluent API 准备请求(设置 http 方法、uri、标头和正文)。请记住:这样做您还没有发送任何请求。
  2. 考虑一下您将使用何种策略来获取将接收 http 客户端事件(响应或错误)的Publisherretreive() 是最直接的方法,但它操纵响应的能力比exchangeToMono 要小。
  3. 订阅... 否则什么都不会发生。 你会发现很多例子会欺骗你:他们声称将 WebClient 用于异步,但随后他们“忘记”订阅 Publisher 并调用block()。好吧,虽然这使事情变得更容易并且它们似乎可以工作(您会看到收到的响应并传递给您的应用程序),但问题是这不再是异步的:您的 Mono(或 Flux,无论您使用什么)将一直阻塞,直到响应到达。不好。
  4. 有一个单独的方法(作为 subscribe() 方法中传递的消费者)处理响应正文。

【讨论】:

    猜你喜欢
    • 2018-05-09
    • 2019-06-04
    • 2018-03-18
    • 2020-07-25
    • 2018-08-18
    • 1970-01-01
    • 2020-01-30
    • 1970-01-01
    • 2017-12-31
    相关资源
    最近更新 更多