【问题标题】:How to emit from Flux onComplete如何从 Flux onComplete 发射
【发布时间】:2021-05-26 13:51:01
【问题描述】:

我正在尝试实现类似于 Akka Streams statefulMapConcat 的东西...基本上我有一个 Flux 的分数是这样的:

Score(LocalDate date, Integer score)

我想吸收这些并每天发出一个聚合:

ScoreAggregate(LocalDate date, Integer scoreCount, Integer totalScore)

所以我有一个聚合器,它保留了我在处理之前设置的一些内部状态,我想对返回 Mono 的聚合器进行平面映射。如果日期发生变化,聚合器只会发出带有值的 Mono,因此您每天只能得到一个。

ScoreAggregator aggregator = ...

Flux<Score> scoreFlux = ...

scoreFlux.flatMap(aggregator::addScore)

所以我的问题是......当scoreFlux 完成时,我如何发出最终元素?聚合器将有一些尚未发出的最后一天的数据,我需要将其发送出去。

【问题讨论】:

  • 我可能不完全理解,但你能不能在 flatmap 调用后使用concatWith() 并添加你想要的其他发布者?
  • 啊……是的。这就是我正在做的事情: .concatWith(Flux.defer(aggregator::onComplete)) 其中 onComplete() 方法将返回最终聚合的 Mono
  • 是的,确切地说 - 这应该有效,因为defer() 确保它不会提前执行:-)

标签: java spring-webflux project-reactor


【解决方案1】:

回应评论作为答案,这样就不会显示为未回答:

所以我的问题是......当 scoreFlux 完成时如何发出最终元素?

您可以在原始通量完成后简单地使用concatWith() 连接您想要的发布者。如果您只希望在原始发布者完成时对其进行评估,请确保将其包装在 Mono.defer() 中,这将防止抢先执行。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-01-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多