【发布时间】:2017-02-20 00:51:01
【问题描述】:
我想做一些我觉得 Spark Streaming 很奇怪的事情,我想得到一些反馈。
我有一个元组 (String,Int) 的 DStream。假设字符串是一个 id,整数是一个值。
因此,对于微批次,我想计算字段 Int 的平均值,并基于此平均值过滤相同的微批次,例如 field2 > 平均值。所以我写了这段代码:
lineStreams
.foreachRDD(
rdd => {
val totalElement = rdd.count()
if(totalElement > 0) {
val totalSum = rdd.map(elem => elem.apply(1).toInt).reduce(_ + _)
val average = totalSum / totalElement
rdd.foreach(
elem => {
if(elem.apply(1).toInt > average){
println("Element is higher than average")
}
}
)
}
})
但实际上这段代码并没有运行,计算的第一部分看起来不错,但不是测试。 我知道这段代码有一些脏东西,但我只想知道逻辑好不好。
感谢您的建议!
【问题讨论】:
标签: scala apache-spark apache-kafka spark-streaming rdd