【问题标题】:recursive API call with WebClient and Reactor 3.0使用 WebClient 和 Reactor 3.0 进行递归 API 调用
【发布时间】:2019-03-24 10:31:03
【问题描述】:

我终于学会了使用 Reactor 进行函数式编程。所以我是新手。

我要做的第一件事是使用 WebClient 调用外部 API。这个调用需要递归,因为响应提供了调用参数的下一个值,我需要在下一次调用中使用它,直到满足普通情况。

这是想出的:

    Flux.from(p -> queryUntilNow())
            .flatMap(res -> // res is object )
            .subscribe( // process )



private Flux<ApiResp> queryUntilNow() {
    return Flux.from(p -> {
        queryAPI(since)
                .doOnError(System.out::println)
                .subscribe(apiResp -> {
                    if (since == apiResp.last) return;

                    since = apiResp.last;
                    queryUntilNow();
                });
    });
}

private Flux<ApiResp> queryAPI(int last) {
    Flux<ApiResp> resp = kapi.get()
            .uri("/OHLC?pair={pair}&since={since}&interval={int}", pair, last, interval)
            .retrieve()
            .bodyToFlux(ApiResp.class);

    return resp;
}

看来我需要更多地调整我的思维以适应这种编程风格,所以请给我一些例子和解释。

谢谢!

【问题讨论】:

    标签: java spring-webflux project-reactor


    【解决方案1】:

    如果您只需要遍历分批返回的线性结果(而不是递归树),您可以使用重复通量,其起点在每次重复时都会发生变化。

    这是一个仅模拟 api 调用的完整示例。您可以在 queryFrom 中替换您的 WebClient 调用:

    public class Main {
    
        private static class ApiResp {
            private final int last;
            private ApiResp(int last) {
                this.last = last;
            }
        }
    
        public static void main(String[] args) {
            queryBetween(0, 15)
                    .doOnNext(apiResp -> System.out.println(apiResp.last))
                    .blockLast();
        }
    
        public static Flux<ApiResp> queryBetween(int startInclusive, int endExclusive) {
            // The starting point of the next iteration
            final AtomicReference<Integer> nextIterationStart = new AtomicReference<>(startInclusive);
            return Flux
                    // defer will cause a new Flux with a new starting point to be created for each subscription
                    .defer(() -> queryFrom(nextIterationStart.get()))
                    // update the starting point of the next iteration
                    .doOnNext(apiResp -> nextIterationStart.set(apiResp.last + 1))
                    // repeat with a new subscription if we haven't reached the end yet
                    .repeat(() -> nextIterationStart.get() < endExclusive)
                    // make sure we didn't go past the end if queryFrom returned more results than we need
                    .takeWhile(apiResp -> apiResp.last < endExclusive);
        }
    
        public static Flux<ApiResp> queryFrom(int start) {
            // simulates an api call that always returns 10 results from the starting point
            return Flux.range(start, 10)
                    .map(ApiResp::new);
        }
    }
    

    【讨论】:

    • 基本上这是我想要的,但我的源不是线性的.. 所以我需要一种包容性的.takeWhile(..) 因为在那一步我只知道两个值会在某个时候匹配但是我需要包括第一场比赛
    猜你喜欢
    • 2019-02-14
    • 2021-08-14
    • 2020-12-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-03-22
    • 1970-01-01
    相关资源
    最近更新 更多