【问题标题】:Zip depended fluxesZip 依赖助焊剂
【发布时间】:2020-06-25 09:18:03
【问题描述】:

简化我有

var flux1 : Flux<Integer>;
var flux2 : Flux<Tuple2<A,B>> = flux1.map( id -> dbFindOperation(id) ).cache();
var flux3 : Flux<A> = flux2.map( tuple -> tuple.getT1() );
var flux4 : Flux<B> = flux2.map( tuple -> tuple.getT2() );
var flux5 : Flux<Tuple3<Tuple2<A,B>,A,B>> = Flux.zip(flux2,flux3,flux4);

当我订阅flux5时,我希望flux2使每个数据库查找操作时,项目传播到flux3、flux4然后传播到flux5会尽可能快。但根据日志和调试信息,所有项目首先加载到flux2,只有在所有项目都加载后才会继续流。

我怎样才能避免这种情况并让它发挥作用?

【问题讨论】:

    标签: java project-reactor reactor


    【解决方案1】:

    尝试记录下游订阅者从每个通量中请求了多少项目(这可以通过 flux.doOnRequest(LongConsumer) 完成。另外还有 Reactor 参考指南中的“3.3.5.背压”一章。

    您可能会看到订阅者如何首先请求 32 个项目,然后从第二个请求开始请求 24 个项目。如果您没有真正的异步处理,它可能会一次请求所有(Long.MAX_VALUE)并简化处理循环(执行 fusion)。

    这可以通过limitRate(n, 0) 运算符和int prefetch 参数控制,例如Flux.zip()

    更新

    不幸的是,prefetch=1 的 zip 和 limitRate|limitRequest=1 都不起作用。它仍然首先将所有项目加载到内存中,并且 doOnRequest 仍然返回 9223372036854775807。

    试试flux1.map(...).limitRate(1, 0).cache()

    顺便说一句,cache() 是一个无限缓存,但在研究实现时 (FluxReplay$ReplaySubscriber.onSubscribe()) 我认为无论缓存大小如何,它都会从上游请求Long.MAX_VALUE。 (只是看源码的一个猜测,有待验证。)

    关于你的提到limitRequest(n)。小心它!该方法名称相当具有误导性。 -- 实际上它相当于take(n)

    【讨论】:

    • 不幸的是,prefetch=1 的 zip 和 limitRate|limitRequest=1 都不起作用。它仍然首先将所有项目加载到内存中,并且 doOnRequest 仍然返回 9223372036854775807。flux1 包含 ≈ 350 个项目。并且对于来自flux1 dbFindOperation 的每个项目返回0..1 个项目(单声道类型)
    猜你喜欢
    • 1970-01-01
    • 2020-02-29
    • 2015-03-22
    • 1970-01-01
    • 2016-03-12
    • 2015-02-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多