【问题标题】:How to calculate average row-wise?如何按行计算平均?
【发布时间】:2017-11-26 09:27:27
【问题描述】:

我正在考虑如何平均多列的值并将其放入一列。但是,如果有一个空值,我想将它排除在平均计算之外。更具体地说,该表如下所示:

+---------------+---------------+---------------+
|           user|       month_01|       month_02|
+---------------+---------------+---------------+
|       garrison|            3.2|            3.0|
|          marta|           null|            1.8|
|        garrett|            4.3|            7.8|
|         harold|            4.5|            3.1|
|          marta|            6.7|            7.2|
|           niko|            4.1|            5.8|
|          james|            9.5|           null|
|          manny|            1.9|            9.8|
|        charles|            7.8|            7.6| ...
+---------------+---------------+---------------+

我想对所有月份进行平均,得到一个包含两列的最终表格,一列是用户,另一列是所有月份值的平均值。但是,我不希望空值成为一个因素,因此如果用户行在所有月份中都有一个空值,那么您只需除以 11。我正在努力思考如何使用 Spark 来做到这一点。决赛桌将如下所示:

+---------------+---------------+
|           user|        average|
+---------------+---------------+
|       garrison|           34.9|
|          marta|            2.3|
|        garrett|           4.43|
|         harold|            8.5|
|          marta|            6.0|
|           niko|            1.1|
|          james|            3.2|
|          manny|            0.7|
|        charles|            7.1|
+---------------+---------------+

所以平均列是每个用户的行中所有值的平均值。

【问题讨论】:

    标签: java apache-spark dataset apache-spark-sql


    【解决方案1】:

    (我使用 Scala 作为与所要求的相反的编程语言,即 Java)

    解决方案 1 - 地图运算符

    我想到的一个解决方案是使用map 运算符。

    ma​​p[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U] 返回一个新的 Dataset,其中包含将 func 应用于每个元素。

    所以解决方案如下:

    scala> months.show
    +--------+--------+--------+
    |    user|month_01|month_02|
    +--------+--------+--------+
    |garrison|     3.2|     3.0|
    |   marta|    null|     1.8|
    | garrett|     4.3|     7.8|
    |  harold|     4.5|     3.1|
    |   marta|     6.7|     7.2|
    |    niko|     4.1|     5.8|
    |   james|     9.5|    null|
    |   manny|     1.9|     9.8|
    | charles|     7.8|     7.6|
    +--------+--------+--------+
    
    val solution = months.map { r =>
      val skipUserColumn = 1
      // be generic as much as possible
      // the number of months can be any number
      val monthsCount = r.size - skipUserColumn
      val nullCount = (skipUserColumn until r.size).count(r.isNullAt)
      val sum = (skipUserColumn until r.size).
        foldLeft(0.0) { 
          case (sum, idx) if !r.isNullAt(idx) => sum + r.getDouble(idx)
          case (sum, idx) => sum
        }
      (r.getString(0), sum / (monthsCount - nullCount))
    }.toDF("user", "month_avg")
    scala> solution.show
    +--------+------------------+
    |    user|         month_avg|
    +--------+------------------+
    |garrison|               3.1|
    |   marta|               1.8|
    | garrett|              6.05|
    |  harold|               3.8|
    |   marta|              6.95|
    |    niko| 4.949999999999999|
    |   james|               9.5|
    |   manny|5.8500000000000005|
    | charles| 7.699999999999999|
    +--------+------------------+
    

    解决方案 2 - 带有函数的 withColumn 运算符

    认为使用map 运算符与基于UDF 的运算符一样无效。它们都在 JVM 上加载二进制行,因此内存需求高于避免复制的解决方案(从内部二进制行格​​式到 JVM 对象)。

    认为withColumn 运算符与functions 对象的执行成本可以提供更好的性能(并且更容易理解)。

    val partial_solution = months.
      withColumn("months", array(months.columns.drop(1).map(col): _*)).
      withColumn("exploded", explode($"months"))
    scala> partial_solution.show
    +--------+--------+--------+-----------+--------+
    |    user|month_01|month_02|     months|exploded|
    +--------+--------+--------+-----------+--------+
    |garrison|     3.2|     3.0| [3.2, 3.0]|     3.2|
    |garrison|     3.2|     3.0| [3.2, 3.0]|     3.0|
    |   marta|    null|     1.8|[null, 1.8]|    null|
    |   marta|    null|     1.8|[null, 1.8]|     1.8|
    | garrett|     4.3|     7.8| [4.3, 7.8]|     4.3|
    | garrett|     4.3|     7.8| [4.3, 7.8]|     7.8|
    |  harold|     4.5|     3.1| [4.5, 3.1]|     4.5|
    |  harold|     4.5|     3.1| [4.5, 3.1]|     3.1|
    |   marta|     6.7|     7.2| [6.7, 7.2]|     6.7|
    |   marta|     6.7|     7.2| [6.7, 7.2]|     7.2|
    |    niko|     4.1|     5.8| [4.1, 5.8]|     4.1|
    |    niko|     4.1|     5.8| [4.1, 5.8]|     5.8|
    |   james|     9.5|    null|[9.5, null]|     9.5|
    |   james|     9.5|    null|[9.5, null]|    null|
    |   manny|     1.9|     9.8| [1.9, 9.8]|     1.9|
    |   manny|     1.9|     9.8| [1.9, 9.8]|     9.8|
    | charles|     7.8|     7.6| [7.8, 7.6]|     7.8|
    | charles|     7.8|     7.6| [7.8, 7.6]|     7.6|
    +--------+--------+--------+-----------+--------+
    

    然而,数据集存在一个问题,即 user 列不是唯一的,因此无法使用聚合。

    如果排除第 5 行中的另一个 marta,我会建议使用我心爱的窗口聚合的以下解决方案。

    // Remember user column is now assumed unique
    // I'm however not excluding it from calculation
    // just assume that (user, month_01) would be unique
    // user and all months together could get us closer to the requirement
    import org.apache.spark.sql.expressions.Window
    val userAndMonth01 = Window.partitionBy("user", "month_01")
    val solution = partial_solution.
      withColumn("avg", avg("exploded") over userAndMonth01).
      select("user", "avg").
      distinct  // <-- be careful since we might get non-unique pairs of user and avg
    scala> solution.show
    +--------+------------------+
    |    user|               avg|
    +--------+------------------+
    |  harold|               3.8|
    |garrison|               3.1|
    | garrett|              6.05|
    |   manny|5.8500000000000005|
    | charles| 7.699999999999999|
    |    niko| 4.949999999999999|
    |   marta|              6.95|
    |   james|               9.5|
    |   marta|               1.8|
    +--------+------------------+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-12-05
      • 1970-01-01
      • 2021-12-11
      • 1970-01-01
      相关资源
      最近更新 更多