(我使用 Scala 作为与所要求的相反的编程语言,即 Java)
解决方案 1 - 地图运算符
我想到的一个解决方案是使用map 运算符。
map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U] 返回一个新的 Dataset,其中包含将 func 应用于每个元素。
所以解决方案如下:
scala> months.show
+--------+--------+--------+
| user|month_01|month_02|
+--------+--------+--------+
|garrison| 3.2| 3.0|
| marta| null| 1.8|
| garrett| 4.3| 7.8|
| harold| 4.5| 3.1|
| marta| 6.7| 7.2|
| niko| 4.1| 5.8|
| james| 9.5| null|
| manny| 1.9| 9.8|
| charles| 7.8| 7.6|
+--------+--------+--------+
val solution = months.map { r =>
val skipUserColumn = 1
// be generic as much as possible
// the number of months can be any number
val monthsCount = r.size - skipUserColumn
val nullCount = (skipUserColumn until r.size).count(r.isNullAt)
val sum = (skipUserColumn until r.size).
foldLeft(0.0) {
case (sum, idx) if !r.isNullAt(idx) => sum + r.getDouble(idx)
case (sum, idx) => sum
}
(r.getString(0), sum / (monthsCount - nullCount))
}.toDF("user", "month_avg")
scala> solution.show
+--------+------------------+
| user| month_avg|
+--------+------------------+
|garrison| 3.1|
| marta| 1.8|
| garrett| 6.05|
| harold| 3.8|
| marta| 6.95|
| niko| 4.949999999999999|
| james| 9.5|
| manny|5.8500000000000005|
| charles| 7.699999999999999|
+--------+------------------+
解决方案 2 - 带有函数的 withColumn 运算符
我认为使用map 运算符与基于UDF 的运算符一样无效。它们都在 JVM 上加载二进制行,因此内存需求高于避免复制的解决方案(从内部二进制行格式到 JVM 对象)。
我认为withColumn 运算符与functions 对象的执行成本可以提供更好的性能(并且更容易理解)。
val partial_solution = months.
withColumn("months", array(months.columns.drop(1).map(col): _*)).
withColumn("exploded", explode($"months"))
scala> partial_solution.show
+--------+--------+--------+-----------+--------+
| user|month_01|month_02| months|exploded|
+--------+--------+--------+-----------+--------+
|garrison| 3.2| 3.0| [3.2, 3.0]| 3.2|
|garrison| 3.2| 3.0| [3.2, 3.0]| 3.0|
| marta| null| 1.8|[null, 1.8]| null|
| marta| null| 1.8|[null, 1.8]| 1.8|
| garrett| 4.3| 7.8| [4.3, 7.8]| 4.3|
| garrett| 4.3| 7.8| [4.3, 7.8]| 7.8|
| harold| 4.5| 3.1| [4.5, 3.1]| 4.5|
| harold| 4.5| 3.1| [4.5, 3.1]| 3.1|
| marta| 6.7| 7.2| [6.7, 7.2]| 6.7|
| marta| 6.7| 7.2| [6.7, 7.2]| 7.2|
| niko| 4.1| 5.8| [4.1, 5.8]| 4.1|
| niko| 4.1| 5.8| [4.1, 5.8]| 5.8|
| james| 9.5| null|[9.5, null]| 9.5|
| james| 9.5| null|[9.5, null]| null|
| manny| 1.9| 9.8| [1.9, 9.8]| 1.9|
| manny| 1.9| 9.8| [1.9, 9.8]| 9.8|
| charles| 7.8| 7.6| [7.8, 7.6]| 7.8|
| charles| 7.8| 7.6| [7.8, 7.6]| 7.6|
+--------+--------+--------+-----------+--------+
然而,数据集存在一个问题,即 user 列不是唯一的,因此无法使用聚合。
如果排除第 5 行中的另一个 marta,我会建议使用我心爱的窗口聚合的以下解决方案。
// Remember user column is now assumed unique
// I'm however not excluding it from calculation
// just assume that (user, month_01) would be unique
// user and all months together could get us closer to the requirement
import org.apache.spark.sql.expressions.Window
val userAndMonth01 = Window.partitionBy("user", "month_01")
val solution = partial_solution.
withColumn("avg", avg("exploded") over userAndMonth01).
select("user", "avg").
distinct // <-- be careful since we might get non-unique pairs of user and avg
scala> solution.show
+--------+------------------+
| user| avg|
+--------+------------------+
| harold| 3.8|
|garrison| 3.1|
| garrett| 6.05|
| manny|5.8500000000000005|
| charles| 7.699999999999999|
| niko| 4.949999999999999|
| marta| 6.95|
| james| 9.5|
| marta| 1.8|
+--------+------------------+