【问题标题】:How to sum values of a struct in a nested array in a Spark dataframe?如何对 Spark 数据帧中嵌套数组中的结构值求和?
【发布时间】:2019-03-18 17:44:13
【问题描述】:

这是在 Spark 2.1 中,给定这个输入文件:

`order.json

{"id":1,"price":202.30,"userid":1}
{"id":2,"price":343.99,"userid":1}
{"id":3,"price":399.99,"userid":2}

以及以下数据框:

val order = sqlContext.read.json("order.json")
val df2 = order.select(struct("*") as 'order)
val df3 = df2.groupBy("order.userId").agg( collect_list( $"order").as("array"))

df3有以下内容:

+------+---------------------------+
|userId|array                      |
+------+---------------------------+
|1     |[[1,202.3,1], [2,343.99,1]]|
|2     |[[3,399.99,2]]             |
+------+---------------------------+

和结构:

root
 |-- userId: long (nullable = true)
 |-- array: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- userid: long (nullable = true)

现在假设我得到了 df3:

  1. 我想为每个 userId 计算 array.price 的总和,利用每个 userId 行的数组。

  2. 我会将此计算添加到结果数据框中的新列中。就像我已经完成了 df3.withColumn("sum", lit(0)),但是 lit(0) 被我的计算取代了。

它会假设是直截了当的,但我坚持两者。我没有找到任何方法来访问整个数组进行每行的计算(例如使用 foldLeft)。

【问题讨论】:

    标签: arrays scala apache-spark apache-spark-sql


    【解决方案1】:

    我想利用数组来计算每个 userId 的 array.price 的总和

    不幸的是,这里有一个数组对你不利。 Spark SQL 和 DataFrame DSL 都没有提供可直接用于在任意大小的数组上处理此任务而无需先分解 (explode) 的工具。

    您可以使用 UDF:

    import org.apache.spark.sql.Row
    import org.apache.spark.sql.functions.{col, udf}
    
    val totalPrice = udf((xs: Seq[Row]) => xs.map(_.getAs[Double]("price")).sum)
    df3.withColumn("totalPrice", totalPrice($"array"))
    
    +------+--------------------+----------+ 
    |userId|               array|totalPrice|
    +------+--------------------+----------+
    |     1|[[1,202.3,1], [2,...|    546.29|
    |     2|      [[3,399.99,2]]|    399.99|
    +------+--------------------+----------+
    

    或转换为静态类型的Dataset:

    df3
      .as[(Long, Seq[(Long, Double, Long)])]
      .map{ case (id, xs) => (id, xs, xs.map(_._2).sum) }
      .toDF("userId", "array", "totalPrice").show
    
    +------+--------------------+----------+
    |userId|               array|totalPrice|
    +------+--------------------+----------+
    |     1|[[1,202.3,1], [2,...|    546.29|
    |     2|      [[3,399.99,2]]|    399.99|
    +------+--------------------+----------+
    

    如上所述,您分解和聚合:

    import org.apache.spark.sql.functions.{sum, first}
    
    df3
      .withColumn("price", explode($"array.price"))
      .groupBy($"userId")
      .agg(sum($"price"), df3.columns.tail.map(c => first(c).alias(c)): _*)
    
    +------+----------+--------------------+
    |userId|sum(price)|               array|
    +------+----------+--------------------+
    |     1|    546.29|[[1,202.3,1], [2,...|
    |     2|    399.99|      [[3,399.99,2]]|
    +------+----------+--------------------+
    

    但价格昂贵,不使用现有结构。

    你可以使用一个丑陋的技巧:

    import org.apache.spark.sql.functions.{coalesce, lit, max, size}
    
    val totalPrice = (0 to df3.agg(max(size($"array"))).as[Int].first)
      .map(i => coalesce($"array.price".getItem(i), lit(0.0)))
      .foldLeft(lit(0.0))(_ + _)
    
    df3.withColumn("totalPrice", totalPrice)
    
    +------+--------------------+----------+
    |userId|               array|totalPrice|
    +------+--------------------+----------+
    |     1|[[1,202.3,1], [2,...|    546.29|
    |     2|      [[3,399.99,2]]|    399.99|
    +------+--------------------+----------+
    

    但这与其说是真正的解决方案,不如说是一种好奇心。

    【讨论】:

      【解决方案2】:

      Spark 2.4.0 及更高版本

      您现在可以使用AGGREGATE 功能。

      df3.createOrReplaceTempView("orders")
      spark.sql(
          """
            |SELECT
            |    *,
            |    AGGREGATE(`array`, 0.0, (accumulator, item) -> accumulator + item.price) AS totalPrice
            |FROM
            |    orders
            |""".stripMargin).show()
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2021-11-20
        • 2019-10-31
        • 1970-01-01
        • 2021-07-09
        • 1970-01-01
        • 2017-01-09
        • 2019-04-11
        • 2021-08-09
        相关资源
        最近更新 更多