【问题标题】:How to calculate the average of a Project Reactor Flux?如何计算项目反应堆通量的平均值?
【发布时间】:2019-07-12 13:43:13
【问题描述】:

我有一个返回 Flux<SensorData> 的方法,假设 SensorData 有一个字段 measure: Integer

我想计算整个 Flux 的测量平均值。怎么办?


val sensorFlux: Flux<SensorData> = sensorRepository.findAll()
...

【问题讨论】:

    标签: spring reactive-programming spring-webflux project-reactor


    【解决方案1】:
    Mono<Double> average = sensorFlux.collect(Collectors.averagingInt(SensorData::getMeasure))
    

    【讨论】:

    • 请注意,collect 会对onComplete 事件做出反应,如果Flux 是“无限的”(传感器数据可能是无限通量),则永远不会发生这种情况
    • @SimonBaslé 谢谢。但是我们怎么可能在不等待完成的情况下计算平均值呢?我猜另一种方法是有一个运行平均值,因此返回一个 Flux,它会在每次传感器发射时发射一个新的平均值。
    • 是的,在这种情况下,您可能需要对源进行窗口化并计算一段时间内的运行平均值,可能
    【解决方案2】:

    使用辅助对象的另一种方法:

    val average = sensorFlux.map { it.measure }
                    .map { Measure(1, it) }
                    .reduce { t: Measure, u: Measure -> Measure(t.elements + u.elements, t.sum + u.sum) }
                    .map { it.sum / it.elements }
    
    ---
    
    data class Measure(var elements: Int, var sum: Long)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-02-07
      相关资源
      最近更新 更多