【发布时间】:2019-12-20 09:24:03
【问题描述】:
我有从套接字接收到的动态热数据流。 我需要检查条件,如果值匹配,则使用新消息跳转到第 3 步。
final Flux<Msg> msgs = Flux.generate(receiver);
final Flux<Msg> processed = msgs
.map(this::checkCondition) //step1
.map(remote::doLongRunning) //optional step2
.map(this::processFurther) //step3
...
public Msg checkCondition(Msg msg) {
if(doCheck(msg)){
//is there a way to jump to step3 here ?
return new OtherMsg(msg, "someAdditionalData"))
} else{
return msg
}
}
我能想到的唯一选择 - 拆分 Flux 并将其组装回来,有没有更清洁的方法?
final Flux<Msg> msgs = Flux.generate(receiver);
final Flux<OtherMsg> checked = msgs
.filter(this::doCheck) //step1
.map(msg -> new OtherMsg(msg, "someAdditionalData"));
final Flux<OtherMsg> unchecked = msgs
.filter(msg -> !doCheck(msg)) //step1
.map(remote::doLongRunning); //optional step2
Flux.merge(checked, unchecked)
.map(this::processFurther) //step3
【问题讨论】:
标签: java optional flux project-reactor