【问题标题】:Webflux producer consumer problem (webClient)Webflux生产者消费者问题(webClient)
【发布时间】:2021-01-03 15:21:46
【问题描述】:

您好,我对 WebFlux 和背压有疑问:

    Flux.range(0, 100)
            .flatMap((Integer y) -> {
                return reallySlowApi(); 
            })
            .doOnEach((Signal<String> x1) -> {
                log("next-------" );
            })
            .subscribeOn(Schedulers.elastic())
            .subscribe()
    ;

如何将呼叫限制为每 5 秒一次呼叫。注意:只能修改reallySlowApi。

private Mono<String> reallySlowApi() {
    return webClient
            .get()
            .retrieve()
            .bodyToMono(String.class);
}

编辑:我知道delayElements,但如果 Api 变得更慢,它不会解决问题。我需要使用reallySlowApi 的最佳方式。

【问题讨论】:

  • 这真的取决于你所说的“最佳”是什么意思?如果您希望您的客户最终成功,那么使用退避重试可能是一个不错的选择。如果您想保护下游,您可以查看 ratelimiter 和 Bulkhead 以防止弹性 4j:resilience4j.readme.io/docs/getting-started 但是,您需要提供一些配置,例如并发限制或每秒请求数。
  • 你找到解决办法了吗?
  • 是和否。不是说我很高兴,但不知何故它起作用了。 .flatMap((Integer y) -> { return reallySlowApi();}, 3) => flatMap(mapper, concurrency)

标签: java spring spring-webflux spring-webclient


【解决方案1】:

一种方法是使用 delayElements()

  public void run() {
    Flux.range(0, 100)
        .delayElements(Duration.ofSeconds(5)) // only emit every 5 seconds
        .flatMap(y -> reallySlowApi())
        .doOnNext(x1 -> System.out.println("next-------"))
        .blockLast(); // subscribe AND wait for the flux to complete
  }

  private Mono<String> reallySlowApi() {
    return Mono.just("next");
  }

您还可以使用 Flux.interval() 加上 take() 来限制迭代次数。

   Flux.interval(Duration.ofSeconds(5))
        .take(100)

请注意,您的示例中的 subscribeOn 并没有做任何事情,特别是订阅操作适用于 0-100 范围内的生成,它不是阻塞的。

【讨论】:

  • 好吧,如果慢的 Api 会更慢怎么办(我会得到背压和 429 Too many requests)。也许有办法制作“Mono.just(“next”);”单身人士?也许信号量..
  • 我明白了,你想限制并发请求的数量吗?即使服务快速响应,您是否想限制请求的速率?或者只是为了在服务响应缓慢的情况下产生背压?
  • yes ... 并发请求数。我不想有基于时间的限制器。更像是 2 个并发请求。我知道并行,但它不起作用。
【解决方案2】:

您可以在您的网络客户端代码中使用重试机制

.doOnError(error -> handleError(error.getMessage()))
            .timeout(Duration.ofSeconds(ServiceConstants.FIVE))
            .retryWhen(
                    Retry.backoff(retryCount, Duration.ofSeconds(ServiceConstants.FIVE))
                            .filter(throwable -> throwable instanceof TimeoutException)
            )

【讨论】:

    【解决方案3】:

    只是把我找到的解决方案放在这里。 WebFlux 在映射响应时我们可以传递并发参数来解决这个问题。

    flatMap(mapper, 并发)

    .flatMap((Integer y) -> {
         return reallySlowApi(); 
     } , 3)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-07-27
      • 2011-08-29
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多