【问题标题】:How to perform aggregation (sum) on different columns and group the result based on another column of a spark dataframe?如何对不同的列执行聚合(求和)并根据 spark 数据帧的另一列对结果进行分组?
【发布时间】:2023-03-21 14:29:02
【问题描述】:

使用 scala-spark,我在 postgres 中读取了一个表并形成了一个数据框:locationDF,其中包含与以下格式的位置相关的数据。

val opts = Map("url" -> "databaseurl","dbtable" -> "locations")
val locationDF = spark.read.format("jdbc").options(opts).load()
locationDF.printSchema()
root
 |-- locn_id: integer (nullable = true)
 |-- start_date: string (nullable = true)
 |-- work_min: double (nullable = true)
 |-- coverage: double (nullable = true)
 |-- speed: double (nullable == true)

初始数据:

+-------------+----------+-------------------+----------------+------------------+
|      locn_id|start_date|           work_min|        coverage|             speed|
+-------------+----------+-------------------+----------------+------------------+
|            3|2012-02-22|  53.62948333333333|          13.644|3.9306276263070457|
|            7|2012-02-22|0.11681666666666667|             0.0|               0.0|
|            1|2012-02-21| 22.783333333333335|             2.6| 8.762820512820513|
|            1|2012-01-21| 23.033333333333335|             2.6|  8.85897435897436|
|            1|2012-01-21|  44.98533333333334|            6.99| 6.435670004768718|
|            4|2012-02-21| 130.34788333333333|           54.67| 2.384267117858667|
|            2|2012-01-21|           94.61035|           8.909|10.619637445280052|
|            1|2012-02-21|                0.0|             0.0|               0.0|
|            1|2012-02-21|            29.3377|           4.579| 6.407010264249837|
|            1|2012-01-21|  59.13276666666667|           8.096| 7.303948451910409|
|            2|2012-03-21| 166.41843333333333|          13.048|12.754325056202738|
|            1|2012-03-21| 14.853183333333334|           2.721| 5.458722283474213|
|            9|2012-03-21|            1.69895|           0.845|2.0105917159763314|
+-------------+----------+-------------------+----------------+------------------+

我正在尝试执行 work_min 的总和(并转换为小时数)、覆盖率总和、特定年份和月份的平均速度,并形成另一个数据框。 为此,我将月份和年份与日期列分开:start_date 如下所示,并得到两列:年份和月份。

locationDF.withColumn("year", date_format(to_date($"start_date"), "yyyy").cast(("整数"))).withColumn("月", date_format(to_date($"start_date"), "MM").cast(("Integer")))

+-------------+----------+-------------------+----------------+------------------+----+-----+
|      locn_id|start_date|           work_min|        coverage|             speed|year|month|
+-------------+----------+-------------------+----------------+------------------+----+-----+
|            3|2012-02-22|  53.62948333333333|          13.644|3.9306276263070457|2012|    2|
|            7|2012-02-22|0.11681666666666667|             0.0|               0.0|2012|    2|
|            1|2012-02-21| 22.783333333333335|             2.6| 8.762820512820513|2012|    2|
|            1|2012-01-21| 23.033333333333335|             2.6|  8.85897435897436|2012|    1|
|            1|2012-01-21|  44.98533333333334|            6.99| 6.435670004768718|2012|    1|
|            4|2012-02-21| 130.34788333333333|           54.67| 2.384267117858667|2012|    2|
|            2|2012-01-21|           94.61035|           8.909|10.619637445280052|2012|    1|
|            1|2012-02-21|                0.0|             0.0|               0.0|2012|    2|
|            1|2012-02-21|            29.3377|           4.579| 6.407010264249837|2012|    2|
|            1|2012-01-21|  59.13276666666667|           8.096| 7.303948451910409|2012|    1|
|            2|2012-03-21| 166.41843333333333|          13.048|12.754325056202738|2012|    3|
|            1|2012-03-21| 14.853183333333334|           2.721| 5.458722283474213|2012|    3|
|            9|2012-03-21|            1.69895|           0.845|2.0105917159763314|2012|    3|
+-------------+----------+-------------------+----------------+------------------+----+-----+

但我不明白如何执行聚合 -> 对两个单独的列求和:work_hours 和覆盖率,列的平均值:同时该特定月份的速度并获得如下结果。

+----+-----+-------------+------------+-----------------+
|year|month|sum_work_mins|sum_coverage|        avg_speed|
+----+-----+-------------+------------+-----------------+
|2012|    1|221.7617833  |  26.595    |11.07274342031118|
|2012|    2|236.2152166  |  75.493    |7.161575173745354|
|2012|    3|182.9705666  |  16.614    |6.741213018551094|
+----+-----+-------------+------------+-----------------+

谁能告诉我如何实现这一目标?

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    我想你正在寻找这个

    scala> dfd.groupBy("year","month").agg(sum("work_min").as("sum_work_min"),sum("coverage").as("sum_coverage"),avg("speed").as("avg_speed")).show
    +----+-----+------------------+------------------+-----------------+
    |year|month|      sum_work_min|      sum_coverage|        avg_speed|
    +----+-----+------------------+------------------+-----------------+
    |2012|    1|221.76178333333334|26.595000000000002|8.304557565233385|
    |2012|    2| 236.2152166666667|            75.493|3.580787586872677|
    |2012|    3|182.97056666666666|            16.614|6.741213018551094|
    +----+-----+------------------+------------------+-----------------+
    

    希望对你有帮助。

    【讨论】:

      猜你喜欢
      • 2020-11-11
      • 2022-01-23
      • 2018-05-22
      • 1970-01-01
      • 2019-08-31
      • 1970-01-01
      • 2020-01-11
      • 1970-01-01
      相关资源
      最近更新 更多