【问题标题】:Request is not send without block()没有 block() 就不会发送请求
【发布时间】:2020-02-10 08:46:59
【问题描述】:

我想使用这个 webflux 客户端代码发送带有回复和不带回复的 POST 请求。我试过这个代码实现:

public class RestClientBuilder {
    private String token;
    private String username;
    private String password;
    private URL gatewayUrl;
    private SslContextBuilder sslContextBuilder;

    public static RestClientBuilder builder() {
        return new RestClientBuilder();
    }

    public RestClientBuilder token(String token) {
        this.token = validateAndTrim(token, "Token");
        return this;
    }

    public RestClientBuilder usernamePassword(String username, String password) {
        this.username = validateAndTrim(username, "Username");
        this.password = validateAndTrim(password, "Password");
        return this;
    }

    private String validateAndTrim(String value, final String parameter) {
        if (value == null || value.trim().isEmpty()) {
            throw new IllegalArgumentException(parameter + " is empty");
        }
        return value.trim();
    }

    public RestClientBuilder gatewayUrl(String gatewayUrl) {
        String urlSt = validateAndTrim(gatewayUrl, "Gateway URL");
        try {
            this.gatewayUrl = new URL(urlSt);
        } catch (MalformedURLException e) {
            throw new IllegalArgumentException("Malformed URL: " + urlSt, e);
        }
        return this;
    }

    public RestClientBuilder truststore(File truststoreFile) {
        getSslContextBuilder().trustManager(truststoreFile);
        return this;
    }

    public RestClientBuilder sslCertificate(File keyCertChainFile, File keyFile, String keyPassword) {
        getSslContextBuilder().keyManager(keyCertChainFile, keyFile, keyPassword);
        return this;
    }

    public RestClient build() throws SSLException {
        SslContext sslContext = sslContextBuilder != null ? sslContextBuilder.build() : null;
        return new RestClient(gatewayUrl.toString(), token, username, password, sslContext);
    }

    private SslContextBuilder getSslContextBuilder() {
        if (sslContextBuilder == null) {
            sslContextBuilder = SslContextBuilder.forClient();
        }
        return sslContextBuilder;
    }

}

其余客户端的实现:

public class RestClient {

    private WebClient client;
    private String gatewayUrl;

    public RestClient(String gatewayUrl, String token, String username, String password, SslContext sslContext) {
        this.gatewayUrl = gatewayUrl;
        WebClient.Builder builder = WebClient.builder().baseUrl(gatewayUrl);
        if (sslContext != null) {
            HttpClient httpClient = HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));
            ClientHttpConnector httpConnector = new ReactorClientHttpConnector(httpClient);
            builder.clientConnector(httpConnector);
        }
        if (username != null && password != null) {
            builder.filter(basicAuthentication(username, password));
        }
        client = builder.build();
    }

    public Mono<Void> executeOnly(ReportRequest transaction) {
        Mono<ReportRequest> transactionMono = Mono.just(transaction);
        return client.post().uri(gatewayUrl)
                .accept(MediaType.APPLICATION_XML)
                .contentType(MediaType.APPLICATION_XML)
                .body(transactionMono, ReportRequest.class)
                .retrieve()
                .bodyToMono(Void.class);
    }
}

进行远程调用:

public class ReportingProcessor {

    private String URL2 = "......";

    public void collectEnvironmentData() throws JAXBException {

        ReportRequest report = new ReportRequest();
        report.setVersion("1.0");

        RestClient client = null;
        try {
            client = RestClientBuilder.builder()
                    .gatewayUrl(URL2)
//                .token(contract.getTerminal_token())
//                  .usernamePassword("user", "password")
//                .truststore(new File("server.pem"))
//                .sslCertificate(new File("client.pem"), new File("clientKey.p8"), "secret")
                    .build();
        } catch (SSLException e) {
            e.printStackTrace();
        }

        Mono<Void> result = client.executeOnly(report);
        Void response = result.block();

    }

当我删除 Void response = result.block(); 时,请求不会发送。我找不到原因。你能给我一些建议如何在不使用block() 的情况下使客户端代码工作。

【问题讨论】:

  • 你必须在那个单声道上调用 subscribe .. 在你订阅之前什么都不会发生
  • 在您订阅之前,发布者(Flux 或 Mono)不会做任何事情。这是你应该阅读的基本原则,在反应器文档中得到了很好的解释。 block() 订阅并阻塞当前线程,直到发布者完成或抛出。
  • 你能告诉我一个被剪断的代码,请问如何解决这个问题?
  • 我添加了.retrieve() .bodyToMono(Void.class) .subscribe();,但再次请求未发送。知道为什么吗?
  • 你能给我一些指导吗?请修复代码的哪一部分?

标签: java spring spring-boot spring-webflux


【解决方案1】:

我将如何实现你的方法是:

public Mono<Void> executeOnly(ReportRequest transaction) {
    Mono<ReportRequest> transactionMono = Mono.just(transaction);
    return client.post().uri(gatewayUrl)
        .accept(MediaType.APPLICATION_XML)
        .contentType(MediaType.APPLICATION_XML)
        .body(transaction, ReportRequest.class)
        .exchange()
        .then();
}

然后我会按如下方式使用它:

client.executeOnly(report).subscribe()

【讨论】:

    【解决方案2】:

    每当您使用 Spring-webflux 时,您必须牢记一件事。即你不必打破你的链条。因为有必要,有人应该在你的链上调用订阅。因为它适用于 RXJava 规范。

    如果你打破了链条,那么你必须打电话给block(),这不是推荐

    您必须按以下方式修改您的代码。

    假设您有一个处理程序正在调用您的 collectEnvironmentData() 方法,而您的方法正在调用远程服务。

    public  Mono<ServerResponse> handelerMethod(ServerRequest request){
      return collectEnvironmentData().flatMap(aVoid -> ServerResponse.ok().build());
    }
    

    你的方法应该修改为

    public Mono<Void> collectEnvironmentData() throws JAXBException {
    
    ReportRequest report = new ReportRequest();
    report.setVersion("1.0");
    
    RestClient client = null;
    try {
        client = RestClientBuilder.builder()
                .gatewayUrl(URL2)
    //                .token(contract.getTerminal_token())
    //                  .usernamePassword("user", "password")
    //                .truststore(new File("server.pem"))
    //                .sslCertificate(new File("client.pem"), new File("clientKey.p8"), 
    //"secret").build();
    } catch (SSLException e) {
        e.printStackTrace();
    }
    
    return client.executeOnly(report);
    }
    

    以上述方式更改您的实现,希望它会起作用。

    【讨论】:

      【解决方案3】:

      method return type 更改为 Mono&lt;Void&gt; 以进行端到端流式传输。

      public void collectEnvironmentData() throws JAXBException {
      
          ReportRequest report = new ReportRequest();
          report.setVersion("1.0");
      
          RestClient client = null;
          try {
              client = RestClientBuilder.builder()
                      .gatewayUrl(URL2)
                      .build();
          } catch (SSLException e) {
              e.printStackTrace();
          }
      
          return client.executeOnly(report);
      
      }
      

      或者您也可以订阅Mono

      client.executeOnly(report).subscribe();
      

      【讨论】:

        猜你喜欢
        • 2019-10-18
        • 2014-10-19
        • 1970-01-01
        • 2014-11-06
        • 1970-01-01
        • 2016-08-01
        • 2016-07-12
        • 2020-01-25
        • 1970-01-01
        相关资源
        最近更新 更多