【发布时间】:2020-06-02 13:10:07
【问题描述】:
我正在使用 Flux 构建我的反应式管道。在管道中,我需要调用 3 个不同的外部系统 REST API,它们的访问速率非常严格。 如果我超出每秒速率阈值,我将被指数级限制。每个系统都有自己的阈值。
我正在使用 Spring WebClient 进行 REST API 调用;在 3 个 API 中,其中 2 个是 GET,1 个是 POST。
在我的反应器管道中,WebClient 被封装在 flatMap 中以执行 API 调用,如下面的代码:
WebClient getApiCall1 = WebClient.builder().build().get("api-system-1").retrieve().bodyToMono(String.class) //actual return DTO is different from string
WebClient getApiCall2 = WebClient.builder().build().get("api-system-2").retrieve().bodyToMono(String.class) //actual return DTO is different from string
WebClient getApiCall3 = WebClient.builder().build().get("api-system-3").retrieve().bodyToMono(String.class) //actual return DTO is different from string
Flux.generator(generator) // Generator pushes the elements from source 1 at a time
// make call to 1st API Service
.flatMap(data -> getApiCall1)
.map(api1Response -> api1ResponseModified)
// make call to 2nd API Service
.flatMap(api1ResponseModified -> getApiCall2)
.map(api2Response -> api2ResponseModified)
// make call to 3rd API Service
.flatMap(api2ResponseModified -> getApiCall3)
.map(api3Response -> api3ResponseModified)
// rest of the pipeline operators
//end
.subscriber();
问题是,如果我没有将concurrency 值设置为flatMap,那么我的管道执行会在服务启动后的几秒钟内突破阈值。
如果我将concurrency 的值设置为1、2、5、10,那么吞吐量会变得非常低。
问题是,在不为并发设置任何值的情况下,如何实现背压来满足外部系统的速率限制?
【问题讨论】:
标签: spring-boot spring-webflux project-reactor reactive-streams spring-webclient