【问题标题】:How to optionally skip several processing steps on Flux如何有选择地跳过 Flux 上的几个处理步骤
【发布时间】:2019-12-20 09:24:03
【问题描述】:

我有从套接字接收到的动态热数据流。 我需要检查条件,如果值匹配,则使用新消息跳转到第 3 步。

    final Flux<Msg> msgs = Flux.generate(receiver);

    final Flux<Msg> processed = msgs
        .map(this::checkCondition)  //step1
        .map(remote::doLongRunning) //optional step2
        .map(this::processFurther)  //step3
 ...

    public Msg checkCondition(Msg msg) {
        if(doCheck(msg)){
            //is there a way to jump to step3 here ?
            return new OtherMsg(msg, "someAdditionalData")) 
        } else{
            return msg
        }
    }

我能想到的唯一选择 - 拆分 Flux 并将其组装回来,有没有更清洁的方法?

   final Flux<Msg> msgs = Flux.generate(receiver);

        final Flux<OtherMsg> checked = msgs
            .filter(this::doCheck) //step1
            .map(msg -> new OtherMsg(msg, "someAdditionalData"));

        final Flux<OtherMsg> unchecked = msgs
            .filter(msg -> !doCheck(msg)) //step1
            .map(remote::doLongRunning);  //optional step2

        Flux.merge(checked, unchecked)
            .map(this::processFurther)  //step3


【问题讨论】:

    标签: java optional flux project-reactor


    【解决方案1】:

    您不能跳过一个步骤,但您可以将flatMap() 与三元运算符一起使用以作为条件分支的一种形式:

    final Flux<Msg> processed = msgs
            .flatMap(msg -> doCheck(msg)
                ? Mono.just(new OtherMsg(msg, "someAdditionalData")).map(remote::doLongRunning)
                : Mono.just(msg))
            .map(this::processFurther);
    

    这样你可以调用任何其他方法来操作三元表达式第一部分的值,如果doCheck()返回false,第二部分将确保它被绕过。 processFurther() 将在 flatMap() 调用之后执行,因此无论如何都会执行。

    【讨论】:

      猜你喜欢
      • 2023-03-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-02-08
      • 2015-02-22
      • 2017-08-03
      • 2014-12-09
      相关资源
      最近更新 更多