【问题标题】:Make Http call using ReactiveX for Java使用 ReactiveX for Java 进行 Http 调用
【发布时间】:2017-02-24 23:18:19
【问题描述】:

我是 ReactiveX for Java 的新手,我有以下代码块可以进行外部 http 调用,但它不是异步的。我们正在使用 rxjava 1.2 和 Java 1.8

  private ResponseEntity<String> callExternalUrl(String url, String json, HttpMethod method) {

    RestTemplate restTemplate;
    HttpEntity request;

      request = new HttpEntity(jsonContent, httpHeaders);

    return restTemplate.exchange(url, httpMethod, request, String.class);

  }

我在网上找到了以下代码块,但我无法完全理解它以及如何将它应用到我的代码库中。

private RxClient<RxObservableInvoker> httpClient;
public <T> Observable<T> fetchResult(String url, Func1<Response, T> mapper) {

    return httpClient.target(url)
        .request()
        .rx()
        .get()
        .subscribeOn(Schedulers.io())
        .map(mapper);
  }

【问题讨论】:

    标签: java asynchronous reactive-programming resttemplate reactivex


    【解决方案1】:

    如果我理解正确,你需要这样的东西来包装你现有的callExternalUrl

    static Observable<String> callExternalUrlAsync(String url, String json, HttpMethod method)
    {
        return Observable.fromCallable(() -> callExternalUrl(url, json, method))
                .subscribeOn(Schedulers.io())
                .flatMap(re -> {
                             if (re.hasBody())
                                 return Observable.just(re.getBody());
                             else
                                 return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
                         },
                         e -> Observable.error(e),
                         (Func0<Observable<? extends String>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
                .observeOn(Schedulers.computation());
    }
    

    代码的简短描述:

    1. 它计划在Schedulers.io 上执行现有的callExternalUrl
    2. ResponseEntity&lt;T&gt; 转换为成功的T 和错误情况的最小转换。它也发生在io 调度程序上,但它并不重要,因为它真的很短。 (如果callExternalUrl内部有异常,则按原样传递。)
    3. 订阅要在Schedulers.computation 上执行的结果

    注意事项

    1. 您可能希望为subscribeOnobserveOn 使用自定义调度程序
    2. 您可能希望在传递给flatMap 的第一个 lambda 中有一些更好的逻辑来区分成功和错误,并且肯定需要一些更具体的异常类型。

    高阶魔法

    如果您愿意使用高阶函数并用一点性能换取更少的代码重复,您可以这样做:

    // Universal wrapper method
    static <T> Observable<T> wrapCallExternalAsAsync(Func3<String, String, HttpMethod, ResponseEntity<T>> externalCall, String url, String json, HttpMethod method)
    {
        return Observable.fromCallable(() -> externalCall.call(url, json, method))
                .subscribeOn(Schedulers.io())
                .flatMap(re -> {
                             if (re.hasBody())
                                 return Observable.just(re.getBody());
                             else
                                 return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
                         },
                         e -> Observable.error(e),
                         (Func0<Observable<? extends T>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
                .observeOn(Schedulers.computation());
    }
    
    static Observable<String> callExternalUrlAsync_HigherOrder(String url, String json, HttpMethod method)
    {
        return wrapCallExternalAsAsync(MyClass::callExternalUrl, url, json, method);
    }
    

    MyClass 是您的 callExternalUrl 所在的位置。


    更新(仅限异步调用)

    私有静态 RxClient httpClient = Rx.newClient(RxObservableInvoker.class); // 这里你可能会传递自定义的 ExecutorService

    private <T> Observable<String> executeHttpAsync(String url, String httpMethod, Entity<T> entity) {
        return httpClient.target(url)
                .request()
                .headers(httpHeaders) // assuming httpHeaders is something global as in your example
                .rx()
                .method(httpMethod, entity)
                .map(resp -> {
                    if (200 != resp.getStatus()) {
                        throw new RuntimeException("Bad status code " + resp.getStatus());
                    } else {
                        if (!resp.hasEntity()) {
                            // return null; // or error?
                            throw new RuntimeException("Empty response"); // or empty?
                        } else {
                            try {
                                return resp.readEntity(String.class);
                            } catch (Exception ex) {
                                throw new RuntimeException(ex); // wrap exception into unchecked
                            }
                        }
                    }
                })
                .observeOn(Schedulers.computation());
    }
    
    private Observable<String> executeGetAsync(String url) {
        return executeHttpAsync(url, "GET", null);
    }
    
    private Observable<String> executePostAsync(String url, String json) {
        return executeHttpAsync(url, "POST", Entity.json(json));
    }
    

    同样的警告适用:

    1. 您可能希望将自定义调度程序用于newClient 调用和observeOn
    2. 您可能希望有一些更好的错误处理逻辑,而不仅仅是检查它是否是 HTTP 200,而且您肯定需要一些更具体的异常类型。但这都是特定于业务逻辑的,因此取决于您。

    从您的示例中也不清楚请求的主体 (HttpEntity) 是如何构建的,以及您是否真的总是希望 String 作为原始示例中的响应。我还是照原样复制了你的逻辑。如果你需要更多的东西,你可能应该参考https://jersey.java.net/documentation/2.25/media.html#json的文档

    【讨论】:

    • 感谢您的回复。我可以试试你做的,但我忘了包括我们需要使用 RxClient httpClient;
    • 当您说“我需要显式转换,否则它不会编译”时,您要转换哪个部分以及转换为什么类型?谢谢
    • @WowBow,flatMap 的最后一个参数应该是 Func0&lt;? extends Observable&lt;? extends T&gt;&gt; 类型,但在我的机器上,lambda 表达式 () -&gt; Observable.empty() 是用一些更具体的函数类型推断出来的,没有编译.至于“我们需要使用 RxClient”,你的意思是你不需要你原来的同步方法,只用异步方法就可以了吗?
    • 是的,我必须摆脱 rest 模板并使用 RxClient 异步调用。如果您可以添加如何以这种方式实现它(包括标题),我很高兴。
    • @WowBow,所以现在您遇到了一个不同的问题,即当您发送 post 请求时,您会收到 HTTP 411 错误“请求必须分块或具有内容长度”?我不知道为什么会这样,在我的本地测试中,我可以看到作为 POST 请求的一部分发送的有效Content-Length。我认为您需要展示您当前的实际代码,甚至可能创建一个新问题,因为它对于 cmets 来说太复杂了。没有Minimal, Complete, and Verifiable example,很难猜到
    猜你喜欢
    • 1970-01-01
    • 2013-12-26
    • 1970-01-01
    • 2020-03-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多