【问题标题】:Reactor Flux flatMap operator throughput/concurrency control and achieve backpressureReactor Flux flatMap 算子吞吐量/并发控制并实现背压
【发布时间】:2020-06-02 13:10:07
【问题描述】:

我正在使用 Flux 构建我的反应式管道。在管道中,我需要调用 3 个不同的外部系统 REST API,它们的访问速率非常严格。 如果我超出每秒速率阈值,我将被指数级限制。每个系统都有自己的阈值。

我正在使用 Spring WebClient 进行 REST API 调用;在 3 个 API 中,其中 2 个是 GET,1 个是 POST。

在我的反应器管道中,WebClient 被封装在 flatMap 中以执行 API 调用,如下面的代码:

WebClient getApiCall1 = WebClient.builder().build().get("api-system-1").retrieve().bodyToMono(String.class) //actual return DTO is different from string
WebClient getApiCall2 = WebClient.builder().build().get("api-system-2").retrieve().bodyToMono(String.class) //actual return DTO is different from string
WebClient getApiCall3 = WebClient.builder().build().get("api-system-3").retrieve().bodyToMono(String.class) //actual return DTO is different from string

    Flux.generator(generator) // Generator pushes the elements from source 1 at a time

    // make call to 1st API Service
    .flatMap(data -> getApiCall1)
    .map(api1Response -> api1ResponseModified)

    // make call to 2nd API Service
    .flatMap(api1ResponseModified -> getApiCall2)
    .map(api2Response -> api2ResponseModified)

// make call to 3rd API Service
.flatMap(api2ResponseModified -> getApiCall3)
.map(api3Response -> api3ResponseModified)

// rest of the pipeline operators

//end
.subscriber();

问题是,如果我没有将concurrency 值设置为flatMap,那么我的管道执行会在服务启动后的几秒钟内突破阈值。 如果我将concurrency 的值设置为1、2、5、10,那么吞吐量会变得非常低。

问题是,在不为并发设置任何值的情况下,如何实现背压来满足外部系统的速率限制?

【问题讨论】:

    标签: spring-boot spring-webflux project-reactor reactive-streams spring-webclient


    【解决方案1】:

    鉴于您有“每秒速率”的要求,我会明确地将通量窗口化,并将每个窗口限制在选定的时间段内。这将为您提供最大吞吐量而不会受到限制。

    我会使用类似于以下的辅助函数:

    public static <T> Flux<T> limitIntervalRate(Flux<T> flux, int ratePerInterval, Duration interval) {
        return flux
                .window(ratePerInterval)
                .zipWith(Flux.interval(Duration.ZERO, interval))
                .flatMap(Tuple2::getT1);
    }
    

    它允许你这样做:

    sourceFlux
            .transform(f -> limitIntervalRate(f, 2, Duration.ofSeconds(1))) //Limit to a rate of 2 per second
    

    然后,您可以根据需要将其映射到您的 WebClient 调用上,同时尊重每个 API 的限制:

    sourceFlux
            //...assume API 1 has a limit of 10 calls per second
            .transform(f -> limitIntervalRate(f, 10, Duration.ofSeconds(1)))
            .flatMap(data -> getApiCall1)
            .map(api1Response -> api1ResponseModified)
    
            //...assume API 2 has a limit of 20 calls per second
            .transform(f -> limitIntervalRate(f, 20, Duration.ofSeconds(1))) 
            .flatMap(api1ResponseModified -> getApiCall2)
            .map(api2Response -> api2ResponseModified)
    

    ...等等。

    【讨论】:

    • 感谢@Michael Berry 的建议和示例代码。还有一个问题,我正在浏览这个Reactor 3 Reference docProgrammatically creating a sequence 部分,我在生成器下看到没有提到背压。但是在创建中,我看到支持背压。这是否意味着,如果我需要背压支持,我应该使用 Create 而不是 Generate?
    • @NaveenKumar generate() 从背压的角度来看是理想的场景——需求完全由背压驱动。因此它不需要溢出策略,因为在显式预取元素之前不会请求它。 create() 本质上是另一个 API / 源的包装器,可以随意发出任意数量的事件 - 所以它不是明确的背压控制,因此需要某种形式的溢出策略来确定如果消费者可以表现如何'跟不上。因此,generate() 几乎总是首选,如果你可以使用的话。
    • 我在最初的问题中错过了一个重要点。当我应用调度程序.subscribeOn(Schedulers.boundedElastic()) 时,这一切的表现如何?我看到的是,当我在管道中应用调度程序时,请求率非常高。
    • @NaveenKumar 不确定这是否会有所作为 - 我似乎无法快速制作它。我会为此提出一个新问题,并确保包含minimal reproducible example
    • 如果有多个源(每个请求每个通量)在这种情况下,上述代码将超过每秒的请求数。我们如何控制多通量(多来源)的每秒支付次数
    【解决方案2】:

    【讨论】:

    • 嗨 Martin Tarjányi,感谢您的指点,是的!我确实在探索它们。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-12-28
    • 1970-01-01
    • 2018-06-02
    • 1970-01-01
    • 2021-01-12
    • 2020-10-12
    相关资源
    最近更新 更多