【问题标题】:Spring Reactive - collect a sequence of paged results as a Mono of all resultsSpring Reactive - 收集一系列分页结果作为所有结果的 Mono
【发布时间】:2018-07-31 14:00:24
【问题描述】:

在我的 REST 服务中,我必须多次调用另一个 REST 服务才能获取结果列表的所有页面。该请求包含一个 from 字段,我需要随每个请求增加该字段。响应包含一个 totalResults 字段 - 当我读取所有结果后,我需要停止调用其他服务,收集所有调用的所有结果并生成一个 Mono<List<Result>> 响应。

这是我目前所拥有的:

@Getter
public class Request {
    private int from;
    private int size = 1000;
    private String type;

    public Request(String type, int from) {
        this.type = type;
        this.from = from;
    }
}

@Getter
@Setter
public class Response {
    private Integer totalResults;
    private Integer size;
    private Integer from;
    private List<Result> results;
}

public Mono<List<Result>> findByType(String type) {
    return Flux.generate(
            () -> new Request(type, 0),
            (Request request, SynchronousSink<List<Result>> sink) -> {
                Response response = find(request).block();
                sink.next(response.getResults());
                int nextFrom = response.getFrom() + response.getSize();
                if (nextFrom >= response.getTotalResults()) {
                    sink.complete();
                }
                return new Request(type, nextFrom);
            })
            .flatMap(Flux::fromIterable)
            .collectList();
}

private Mono<Response> find(Request request) {
    return webClient
            .post()
            .uri("/search")
            .syncBody(request)
            .retrieve()
            .bodyToMono(Response.class);
}

它在使用 MockWebServerStepVerifier 的测试中工作,但在生产中失败

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2

我怎样才能以正确的反应方式做到这一点?

【问题讨论】:

    标签: java spring spring-webflux project-reactor


    【解决方案1】:

    编辑在 Adam 的帮助下,expand 函数修复了这个问题

    public Mono<List<Result>> findByType(Request request) {
            return find(request)
                    .expand(response -> {
                        int nextFrom = response.getFrom() + response.getSize();
                        if (nextFrom >= response.getTotalResults()) {
                            return Mono.empty();
                        }
                        return find(new Request(request.getType(), response.getFrom() + response.getSize()));
                    })
                     .flatMap(response -> Flux.fromIterable(response.getResults()))
                     .collectList();;
        }
    
        private Mono<Response> find(Request request) {
            return webClient
                    .post()
                    .uri("/search")
                    .contentType(MediaType.APPLICATION_JSON)
                    .accept(MediaType.APPLICATION_JSON)
                    .syncBody(request)
                    .retrieve()
                    .bodyToMono(Response.class);
        }
    

    【讨论】:

    • 我使用了相同的方法,但在我的情况下,expand 没有终止
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-01-07
    • 2010-12-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多