【问题标题】:How to include non-aggregated columns after aggregation?聚合后如何包含非聚合列?
【发布时间】:2020-01-22 18:13:19
【问题描述】:

我正在使用 spark-sql-2.4.1v。 这里我有如下场景

val df = Seq(
  (2010,"2018-11-24",71285,"USA","0.9192019",  "0.1992019",  "0.9955999"),
  (2010,"2017-08-24",71286,"USA","0.9292018",  "0.2992019",  "0.99662018"),
  (2010,"2019-02-24",71287,"USA","0.9392017",  "0.3992019",  "0.99772000")).toDF("seq_id","load_date","company_id","country_code","item1_value","item2_value","item3_value")
.withColumn("item1_value", $"item1_value".cast(DoubleType))
.withColumn("item2_value", $"item2_value".cast(DoubleType))
.withColumn("item3_value", $"item3_value".cast(DoubleType))
.withColumn("fiscal_year", year(col("load_date")).cast(IntegerType))
.withColumn("fiscal_quarter", quarter(col("load_date")).cast(IntegerType))

df.show()


val aggregateColumns = Seq("item1_value","item2_value","item3_value")
var aggDFs = aggregateColumns.map( c => {
    df.groupBy("country_code").agg(lit(c).as("col_name"),sum(c).as("sum_of_column"))
})


var combinedDF = aggDFs.reduce(_ union _)
combinedDF.show

我得到的输出数据

|country_code|   col_name|     sum_of_column|


|         USA|item1_value|         2.7876054|
|         USA|item2_value|         0.8976057|
|         USA|item3_value|2.9899400800000002|

我需要在输出中获取其他列,即“seq_id”、“load_date”和“company_id” 数据帧聚合操作后如何获取?

【问题讨论】:

  • 得到“seq_id”和其他字段后你的输出会是什么样子?
  • 我们是否正在考虑从基础数据框加入?
  • 你可以添加你想要的输出吗?
  • 您可以通过创建所需的列 item1_valueitem2_value 等来避免这种昂贵的 reduce(_ union _)(迭代且缓慢),而不是创建一个 6 列结果数据框,而是创建一个多列数据集在各自的列中进行聚合。只是说...

标签: apache-spark apache-spark-sql


【解决方案1】:

您可以使用窗口函数来显示非聚合列,也可以说在每一行中显示总和。

如果有帮助请看下面的代码sn-p:

import org.apache.spark.sql.expressions.Window

val df = Seq(
  (2010,"2018-11-24",71285,"USA","0.9192019",  "0.1992019",  "0.9955999"),
  (2010,"2017-08-24",71286,"USA","0.9292018",  "0.2992019",  "0.99662018"),
  (2010,"2019-02-24",71287,"USA","0.9392017",  "0.3992019",  "0.99772000")).
  toDF("seq_id","load_date","company_id","country_code","item1_value","item2_value","item3_value").
  withColumn("item1_value", $"item1_value".cast(DoubleType)).
  withColumn("item2_value", $"item2_value".cast(DoubleType)).
  withColumn("item3_value", $"item3_value".cast(DoubleType)).
  withColumn("fiscal_year", year(col("load_date")).cast(IntegerType)).
  withColumn("fiscal_quarter", quarter(col("load_date")).cast(IntegerType))


val byCountry = Window.partitionBy(col("country_code"))

val aggregateColumns = Seq("item1_value","item2_value","item3_value")
var aggDFs = aggregateColumns.map( c => {
    df.withColumn("col_name",lit(c)).withColumn("sum_country", sum(c) over byCountry)
})

var combinedDF = aggDFs.reduce(_ union _)

combinedDF.
select("seq_id","load_date","company_id","country_code","col_name","sum_country").
distinct.show(100,false)

输出会是这样的:

+------+----------+----------+------------+-----------+------------------+
|seq_id|load_date |company_id|country_code|col_name   |sum_country       |
+------+----------+----------+------------+-----------+------------------+
|2010  |2019-02-24|71287     |USA         |item1_value|2.7876054         |
|2010  |2018-11-24|71285     |USA         |item1_value|2.7876054         |
|2010  |2017-08-24|71286     |USA         |item1_value|2.7876054         |
|2010  |2018-11-24|71285     |USA         |item2_value|0.8976057000000001|
|2010  |2019-02-24|71287     |USA         |item2_value|0.8976057000000001|
|2010  |2017-08-24|71286     |USA         |item2_value|0.8976057000000001|
|2010  |2019-02-24|71287     |USA         |item3_value|2.9899400800000002|
|2010  |2018-11-24|71285     |USA         |item3_value|2.9899400800000002|
|2010  |2017-08-24|71286     |USA         |item3_value|2.9899400800000002|
+------+----------+----------+------------+-----------+------------------+

【讨论】:

    【解决方案2】:

    用下面的代码sn-p替换你的代码,

    scala> val W = Window.partitionBy("country_code")
    scala> val aggDFs = aggregateColumns.map( c => {
         | df.withColumn("col_name", lit(c)).withColumn("sum_of_column" ,sum(c).over(W)).select("seq_id","load_date", "company_id","col_name","sum_of_column")
         | })
    
    scala> val combinedDF = aggDFs.reduce(_ union _)
    scala> combinedDF.show()
    +------+----------+----------+-----------+------------------+                   
    |seq_id| load_date|company_id|   col_name|     sum_of_column|
    +------+----------+----------+-----------+------------------+
    |  2010|2018-11-24|     71285|item1_value|         2.7876054|
    |  2010|2017-08-24|     71286|item1_value|         2.7876054|
    |  2010|2019-02-24|     71287|item1_value|         2.7876054|
    |  2010|2018-11-24|     71285|item2_value|         0.8976057|
    |  2010|2017-08-24|     71286|item2_value|         0.8976057|
    |  2010|2019-02-24|     71287|item2_value|         0.8976057|
    |  2010|2018-11-24|     71285|item3_value|2.9899400800000002|
    |  2010|2017-08-24|     71286|item3_value|2.9899400800000002|
    |  2010|2019-02-24|     71287|item3_value|2.9899400800000002|
    +------+----------+----------+-----------+------------------+
    

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-17
    • 2020-03-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多