【问题标题】:How to limit concurrent http requests with Mono & Flux如何使用 Mono 和 Flux 限制并发 http 请求
【发布时间】:2021-10-21 16:11:46
【问题描述】:

我想处理Flux 以限制Mono 列表发出的并发HTTP 请求。

当一些请求完成时(收到响应),然后服务请求另一个,直到等待请求的总数为 15。

单个请求返回一个列表并根据结果触发另一个请求。

此时,我想以有限的并发性发送请求。 因为消费者端,太多的 HTTP 请求会让对端的服务器陷入困境。

我使用了flatMapMany,如下所示。

public Flux<JsonNode> syncData() {
    return service1
        .getData(param1)
        .flatMapMany(res -> {
                List<Mono<JsonNode>> totalTask = new ArrayList<>();
                Map<String, Object> originData = service2.getDataFromDB(param2);
                res.withArray("data").forEach(row -> {
                       String id = row.get("id").asText();
                       if (originData.containsKey(id)) {
                           totalTask.add(service1.updateRequest(param3));
                       } else {
                            totalTask.add(service1.deleteRequest(param4));
                       }
                       originData.remove(id);
                });
                for (left) {
                    totalTask.add(service1.createRequest(param5));
                }
                return Flux.merge(totalTask);
        });
}
void syncData() {
    syncDataService.syncData().????;
}

我尝试链接.window(15),但它不起作用。所有请求同时发送。

我如何处理Flux 以实现我的目标?

【问题讨论】:

    标签: java spring spring-webflux project-reactor


    【解决方案1】:

    您可以在 Flux 上使用limitRate。您可能需要重新格式化您的代码,但请参阅此处的文档:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#limitRate-int-

    【讨论】:

      【解决方案2】:

      恐怕 Project Reactor 没有提供任何速率或时间限制的实现。

      但是,您可以找到许多提供此类功能并与 Project Reactor 兼容的第三方库。据我所知,resilience4-reactor 支持这一点,并且还兼容 Spring 和 Spring Boot 框架。

      RateLimiterOperator 检查下游订阅者/观察者是否可以获得订阅上游发布者的权限。如果超出速率限制,RateLimiterOperator 可能会延迟从上游请求数据,也可能会向下游订阅者发出 RequestNotPermitted 错误。

      RateLimiter rateLimiter = RateLimiter.ofDefaults("name");
      Mono.fromCallable(backendService::doSomething)
          .transformDeferred(RateLimiterOperator.of(rateLimiter))
      

      更多关于 RateLimiter 模块本身的信息在这里:https://resilience4j.readme.io/docs/ratelimiter

      【讨论】:

        猜你喜欢
        • 2017-06-13
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-12-03
        • 2021-09-30
        • 1970-01-01
        • 2020-01-22
        相关资源
        最近更新 更多