【问题标题】:Wait for multiple http response using reactor使用反应器等待多个http响应
【发布时间】:2019-10-01 01:18:22
【问题描述】:

我正在使用 Spring reactor 进行多个 http 调用并等待所有结果。 这是我的代码:

final Map<String, WSSearchResult> reduced = new HashMap<>();
List<Mono<ClientResponse>> monos = new ArrayList<>();
for (int i=0;i<10;i++) {
    log.info("Executing http call {}", i);
    WSSearchRequest wsSearchRequest = WSSearchRequest.builder().param(i).build();

    Mono<ClientResponse> exchange = webClient.post().uri("/search/availability")
            .body(BodyInserters.fromObject(wsSearchRequest)).exchange();
    monos.add(exchange);
}

final CountDownLatch latch = new CountDownLatch(monos.size());
Flux.merge(monos).subscribe(clientResponse -> {
    List<WSSearchResult> partialResult = clientResponse.bodyToFlux(WSSearchResult.class).collectList().block();
    List<WSSearchResult> partial =
            partialResult.parallelStream().filter(w-> !Strings.isNullOrEmpty(w.getId())).
                    collect(Collectors.toList());

    mapAndReduce(partial, reduced);
});

try {
    latch.await(150, TimeUnit.SECONDS);
} catch (InterruptedException e) {
    log.error(e.getMessage(), e);
}

问题是上面的代码给出了异常:

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-6
Caused by: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-6
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:77) ~[reactor-core-3.2.8.RELEASE.jar!/:3.2.8.RELEASE]
    at reactor.core.publisher.Mono.block(Mono.java:1494) ~[reactor-core-3.2.8.RELEASE.jar!/:3.2.8.RELEASE]

我是反应堆开发的新手,我不知道为什么它给了我这个例外。

有人能让我走上正确的道路吗?

【问题讨论】:

    标签: spring http asynchronous microservices project-reactor


    【解决方案1】:

    感觉这段代码可以改成如下方式:

    List<Mono<ClientResponse>> clientResponses = IntStream.range(0, 10)
        .mapToObj(i -> WSSearchRequest.builder().param(i).build())
        .map(request -> send(request))
        .collect(toList());
    
    Mono<Map<String, WSSearchResult>> reduced = Flux.merge(clientResponses)
        .flatMap(clientResponse ->
            clientResponse.bodyToFlux(WSSearchResult.class)
                .filter(result -> !Strings.isNullOrEmpty(result.getId()))
                .collectList()
                .map(listOfFilteredResults -> map(listOfFilteredResults))
        ).reduce(new HashMap<>(), (left, right) -> reduce(left, right));
    
    reduced
      .timeout(Duration.ofSeconds(150))
      .subscribe(result -> handle(result));
    
    private Mono<ClientResponse> send(WSSearchRequest request) {
      return webClient.post().uri("/search/availability")
          .body(BodyInserters.fromObject(wsSearchRequest))
          .exchange();
    }
    

    【讨论】:

      猜你喜欢
      • 2015-12-28
      • 2017-11-19
      • 1970-01-01
      • 1970-01-01
      • 2021-06-27
      • 2019-08-22
      • 2016-05-26
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多