【发布时间】:2023-03-21 14:29:02
【问题描述】:
使用 scala-spark,我在 postgres 中读取了一个表并形成了一个数据框:locationDF,其中包含与以下格式的位置相关的数据。
val opts = Map("url" -> "databaseurl","dbtable" -> "locations")
val locationDF = spark.read.format("jdbc").options(opts).load()
locationDF.printSchema()
root
|-- locn_id: integer (nullable = true)
|-- start_date: string (nullable = true)
|-- work_min: double (nullable = true)
|-- coverage: double (nullable = true)
|-- speed: double (nullable == true)
初始数据:
+-------------+----------+-------------------+----------------+------------------+
| locn_id|start_date| work_min| coverage| speed|
+-------------+----------+-------------------+----------------+------------------+
| 3|2012-02-22| 53.62948333333333| 13.644|3.9306276263070457|
| 7|2012-02-22|0.11681666666666667| 0.0| 0.0|
| 1|2012-02-21| 22.783333333333335| 2.6| 8.762820512820513|
| 1|2012-01-21| 23.033333333333335| 2.6| 8.85897435897436|
| 1|2012-01-21| 44.98533333333334| 6.99| 6.435670004768718|
| 4|2012-02-21| 130.34788333333333| 54.67| 2.384267117858667|
| 2|2012-01-21| 94.61035| 8.909|10.619637445280052|
| 1|2012-02-21| 0.0| 0.0| 0.0|
| 1|2012-02-21| 29.3377| 4.579| 6.407010264249837|
| 1|2012-01-21| 59.13276666666667| 8.096| 7.303948451910409|
| 2|2012-03-21| 166.41843333333333| 13.048|12.754325056202738|
| 1|2012-03-21| 14.853183333333334| 2.721| 5.458722283474213|
| 9|2012-03-21| 1.69895| 0.845|2.0105917159763314|
+-------------+----------+-------------------+----------------+------------------+
我正在尝试执行 work_min 的总和(并转换为小时数)、覆盖率总和、特定年份和月份的平均速度,并形成另一个数据框。 为此,我将月份和年份与日期列分开:start_date 如下所示,并得到两列:年份和月份。
locationDF.withColumn("year", date_format(to_date($"start_date"), "yyyy").cast(("整数"))).withColumn("月", date_format(to_date($"start_date"), "MM").cast(("Integer")))
+-------------+----------+-------------------+----------------+------------------+----+-----+
| locn_id|start_date| work_min| coverage| speed|year|month|
+-------------+----------+-------------------+----------------+------------------+----+-----+
| 3|2012-02-22| 53.62948333333333| 13.644|3.9306276263070457|2012| 2|
| 7|2012-02-22|0.11681666666666667| 0.0| 0.0|2012| 2|
| 1|2012-02-21| 22.783333333333335| 2.6| 8.762820512820513|2012| 2|
| 1|2012-01-21| 23.033333333333335| 2.6| 8.85897435897436|2012| 1|
| 1|2012-01-21| 44.98533333333334| 6.99| 6.435670004768718|2012| 1|
| 4|2012-02-21| 130.34788333333333| 54.67| 2.384267117858667|2012| 2|
| 2|2012-01-21| 94.61035| 8.909|10.619637445280052|2012| 1|
| 1|2012-02-21| 0.0| 0.0| 0.0|2012| 2|
| 1|2012-02-21| 29.3377| 4.579| 6.407010264249837|2012| 2|
| 1|2012-01-21| 59.13276666666667| 8.096| 7.303948451910409|2012| 1|
| 2|2012-03-21| 166.41843333333333| 13.048|12.754325056202738|2012| 3|
| 1|2012-03-21| 14.853183333333334| 2.721| 5.458722283474213|2012| 3|
| 9|2012-03-21| 1.69895| 0.845|2.0105917159763314|2012| 3|
+-------------+----------+-------------------+----------------+------------------+----+-----+
但我不明白如何执行聚合 -> 对两个单独的列求和:work_hours 和覆盖率,列的平均值:同时该特定月份的速度并获得如下结果。
+----+-----+-------------+------------+-----------------+
|year|month|sum_work_mins|sum_coverage| avg_speed|
+----+-----+-------------+------------+-----------------+
|2012| 1|221.7617833 | 26.595 |11.07274342031118|
|2012| 2|236.2152166 | 75.493 |7.161575173745354|
|2012| 3|182.9705666 | 16.614 |6.741213018551094|
+----+-----+-------------+------------+-----------------+
谁能告诉我如何实现这一目标?
【问题讨论】:
标签: scala apache-spark apache-spark-sql