【问题标题】:Perform multiple aggregations on different columns in same dataframe with alias Spark Scala使用别名 Spark Scala 对同一数据帧中的不同列执行多个聚合
【发布时间】:2019-12-17 02:06:18
【问题描述】:

这是基于来自以下链接的 Sumit 回答的问题

[Spark SQL: apply aggregate functions to a list of columns

这里是详细信息

val Claim1 = StructType(Seq(StructField("pid", StringType, true),StructField("diag1", StringType, 
true),StructField("diag2", StringType, true), StructField("allowed", IntegerType, true), 
StructField("allowed1", IntegerType, true)))

val claimsData1 = Seq(("PID1", "diag1", "diag2", 100, 200), ("PID1", "diag2", "diag3", 300, 600), 
("PID1", "diag1", "diag5", 340, 680), ("PID2", "diag3", "diag4", 245, 490), ("PID2", "diag2", 
"diag1", 124, 248))

val claimRDD1 = sc.parallelize(claimsData1)
val claimRDDRow1 = claimRDD1.map(p => Row(p._1, p._2, p._3, p._4, p._5))
val claimRDD2DF1 = sqlContext.createDataFrame(claimRDDRow1, Claim1)
val exprs = Map("allowed" -> "sum", "allowed1" -> "avg")
claimRDD2DF1.groupBy("pid").agg(exprs) show false

但它没有为命名新列提供别名,我有一个数据框,我需要对一组列执行多个聚合,它可以是多组列的 sum、avg、min、max,所以请让我知道是否有解决上述问题的方法或更好的方法来实现这一点?

提前致谢。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    您的代码稍作修改即可工作,诀窍是调用callUDF,它将聚合函数作为字符串并可以别名:

    val exprs = Map("allowed" -> "sum", "allowed1" -> "avg")
    
    val aggExpr = exprs.map{case (k,v)  => callUDF(v,col(k)).as(k)}.toList
    
    claimRDD2DF1.groupBy("pid").agg(aggExpr.head,aggExpr.tail:_*)
      .show()
    

    或者如果您可以将您的聚合指定为函数对象,则无需使用callUDF

    val aggExpr = Seq(
      ("allowed",sum(_:Column)),
      ("allowed1", avg(_:Column))
    )
      .map{case (k,v)  => v(col(k)).as(k)}
    
    
    claimRDD2DF1.groupBy("pid").agg(aggExpr.head,aggExpr.tail:_*)
      .show()
    

    两个版本都给

    +----+-------+-----------------+
    | pid|allowed|         allowed1|
    +----+-------+-----------------+
    |PID1|    740|493.3333333333333|
    |PID2|    369|            369.0|
    +----+-------+-----------------+
    

    【讨论】:

      【解决方案2】:

      您可以使用alias 如下定义agg 函数列表并使用它们

      import org.apache.spark.sql.functions._
      
      //You should at least know list of columns for particular function   
      val colsToSum = claimRDD2DF1.columns.filter(_.startsWith("a"))
      val colsToAvg = List("allowed", "allowed1")
      
      //define functions and its alias for list of columns 
      val sumList = colsToSum.map(name => sum(name).as(name + "_sum"))
      val avgList = colsToAvg.map(name => avg(name).as(name + "_avg"))
      
      //get a final list of functions
      val exp = sumList  ++ avgList
      
      //Apply list functions in single groupBy 
      claimRDD2DF1.groupBy("pid").agg(exp.head, exp.tail: _*).show(false)
      

      这会给你

      +----+-----------+------------+------------------+-----------------+
      |pid |allowed_sum|allowed1_sum|allowed_avg       |allowed1_avg     |
      +----+-----------+------------+------------------+-----------------+
      |PID1|740        |1480        |246.66666666666666|493.3333333333333|
      |PID2|369        |738         |184.5             |369.0            |
      +----+-----------+------------+------------------+-----------------+
      

      【讨论】:

      • 谢谢 Shankar,我想知道这是最好的方法还是可以做到的任何其他方法,bcs 我有数千列,似乎有点重,寻找更简单的方法定义。
      • 如果你有多个函数,我认为定义函数会是相同的。你可以做的是为特定函数创建一个列列表(如 claimRDD2DF1.columns)并创建一个最终列表。
      • 是的,我正在寻找一些精确的东西,比如我分享的链接中给出的解决方案,但它是多列的单一聚合。
      • 谢谢,这样更好。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2023-03-21
      • 2016-04-29
      • 2019-08-09
      • 2020-08-27
      • 2012-09-17
      相关资源
      最近更新 更多