【问题标题】:Reactor Mono publish to multiple methodsReactor Mono 发布到多种方法
【发布时间】:2020-04-24 07:11:44
【问题描述】:

我在将对象发布到多个方法时遇到问题。下面给出了我的代码的简化版本。

package org.example.reactive;

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;

@Slf4j
public class MonoTest {

    public static void main(String... args) {
        MonoTest m = new MonoTest();
        Mono<A> aMono = m.getA();
        Mono<B> bMono = aMono.flatMap(m::getB);
        Mono<C> cMono = aMono.flatMap(m::getC);
        Mono<D> dMono = cMono.zipWith(bMono).flatMap(m::getD);
        Mono<E> eMono = Mono.zip(aMono, cMono, dMono)
                .flatMap(m::getE);
        aMono
                .zipWith(eMono)
                .subscribe(m::onCompleted, m::onFailed);
    }

    private Mono<A> getA(){
        log.info("inside getA");
        return Mono.just(new A());
    }
    private Mono<B> getB(A a){
        log.info("inside getB");
        return Mono.just(new B());
    }
    private Mono<C> getC(A a){
        log.info("inside getC");
        return Mono.just(new C());
    }
    private Mono<D> getD(Tuple2 t){
        log.info("inside getD");
        return Mono.just(new D());
    }
    private Mono<E> getE(Tuple3 t){
        log.info("inside getE");
        return Mono.just(new E());
    }
    private void onCompleted(Tuple2 t){
        log.info("inside onCompleted");
    }
    private void onFailed(Throwable t){
        log.info("inside onFailed");
    }

    class A {}
    class B {}
    class C {}
    class D {}
    class E {}
}

我希望每个方法只调用一次。但是 getC 被调用了两次。这里有什么问题?程序输出如下

org.example.reactive.MonoTest - 在 getA 中

org.example.reactive.MonoTest - 在 getC 中

org.example.reactive.MonoTest - 在 getC 中

org.example.reactive.MonoTest - 在 getB 中

org.example.reactive.MonoTest - 在 getD 中

org.example.reactive.MonoTest - 在 getE 中

org.example.reactive.MonoTest - onCompleted 内部

编辑

好吧,我可以通过缓存来解决它,如下所示。

        Mono<A> aMono = m.getA().cache();
        Mono<B> bMono = aMono.flatMap(m::getB).cache();
        Mono<C> cMono = aMono.flatMap(m::getC).cache();
        Mono<D> dMono = cMono.zipWith(bMono).flatMap(m::getD).cache();

【问题讨论】:

    标签: java mono project-reactor spring-reactive


    【解决方案1】:

    您的Monos 集合中有两种模式:

    • aMono 是一个常量,并且由于直接变量分配而被急切地解析一次(您调用一次 getA()
    • 另一方面,其他单声道从运算符内部调用getX() 方法,特别是flatMap。这意味着当订阅平面映射单声道时,这些调用是延迟执行的

    aMonogetX() 方法的唯一顶级调用。 将Mono 变量替换为它们的定义,aMono 除外,这样会更清楚会发生什么:

    MonoTest m = new MonoTest();
    
    Mono<A> aMono = m.getA(); // <-- getA log
    aMono.zipWith(
        Mono.zip(
           aMono,
           aMono.flatMap(m::getC),  // <-- getC log
           aMono.flatMap(m::getC) // <-- getC log
                .zipWith(aMono.flatMap(m::getB)) // <-- getB log
                .flatMap(m::getD) // <-- getD log
        ).flatMap(m::getE) // <-- getE log
      ).subscribe(...);
    

    这就是您获得报告的日志数量和顺序的原因。

    【讨论】:

    • getC 在这里也被调用了两次。
    • 我重写了等效代码,更好地说明了为什么 getC 显示两次,因为您的问题是问“为什么”,而不是“如何解决”
    • 请注意,您可以将 cache() 附加到 cMono 的声明中
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-11-23
    • 1970-01-01
    • 1970-01-01
    • 2020-12-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多