【发布时间】:2021-05-26 13:51:01
【问题描述】:
我正在尝试实现类似于 Akka Streams statefulMapConcat 的东西...基本上我有一个 Flux 的分数是这样的:
Score(LocalDate date, Integer score)
我想吸收这些并每天发出一个聚合:
ScoreAggregate(LocalDate date, Integer scoreCount, Integer totalScore)
所以我有一个聚合器,它保留了我在处理之前设置的一些内部状态,我想对返回 Mono 的聚合器进行平面映射。如果日期发生变化,聚合器只会发出带有值的 Mono,因此您每天只能得到一个。
ScoreAggregator aggregator = ...
Flux<Score> scoreFlux = ...
scoreFlux.flatMap(aggregator::addScore)
所以我的问题是......当scoreFlux 完成时,我如何发出最终元素?聚合器将有一些尚未发出的最后一天的数据,我需要将其发送出去。
【问题讨论】:
-
我可能不完全理解,但你能不能在 flatmap 调用后使用
concatWith()并添加你想要的其他发布者? -
啊……是的。这就是我正在做的事情: .concatWith(Flux.defer(aggregator::onComplete)) 其中 onComplete() 方法将返回最终聚合的 Mono
-
是的,确切地说 - 这应该有效,因为
defer()确保它不会提前执行:-)
标签: java spring-webflux project-reactor