【发布时间】:2020-06-21 19:33:02
【问题描述】:
我想 groupby 聚合一个 pyspark 数据帧,同时根据该数据帧的另一列删除重复项(保留最后一个值)。
总之,我想将 dropDuplicates 应用于 GroupedData 对象。因此,对于每个组,我只能动态地按某列保留一行。
示例
对于下面的数据帧,直接的组聚合将是:
from pyspark.sql import functions
dataframe = spark.createDataFrame(
[
(1, "2020-01-01", 1, 1),
(2, "2020-01-01", 2, 1),
(3, "2020-01-02", 1, 1),
(2, "2020-01-02", 1, 1)
],
("id", "ts", "feature", "h3")
).withColumn("ts", functions.col("ts").cast("timestamp"))
# +---+-------------------+-------+---+
# | id| ts|feature| h3|
# +---+-------------------+-------+---+
# | 1|2020-01-01 00:00:00| 1| 1|
# | 2|2020-01-01 00:00:00| 2| 1|
# | 3|2020-01-02 00:00:00| 1| 1|
# | 2|2020-01-02 00:00:00| 1| 1|
# +---+-------------------+-------+---+
aggregated = dataframe.groupby("h3",
functions.window(
timeColumn="ts",
windowDuration="3 days",
slideDuration="1 day",
)
).agg(
functions.sum("feature")
)
aggregated.show(truncate=False)
产生以下数据框:
+---+------------------------------------------+------------+
|h3 |window |sum(feature)|
+---+------------------------------------------+------------+
|1 |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|3 |
|1 |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|5 |
|1 |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|5 |
|1 |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|2 |
+---+------------------------------------------+------------+
问题
我希望聚合仅使用每个 id 的最新状态。在这种情况下,id=2 已在ts=2020-01-02 00:00:00 更新为feature=1,因此当id=2 时,所有基本时间戳大于2020-01-02 00:00:00 的聚合都应仅将此状态用于列功能。预期的聚合数据框是:
+---+------------------------------------------+------------+
|h3 |window |sum(feature)|
+---+------------------------------------------+------------+
|1 |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|3 |
|1 |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|3 |
|1 |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|3 |
|1 |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|2 |
+---+------------------------------------------+------------+
如何使用 pyspark 做到这一点?
更新
我假设 MapType 变量在 Spark 中不应该有重复的键。有了这个假设,我想我可以聚合列创建一个映射id -> feature,然后只用 sum 聚合映射值(或者最终聚合应该是什么)。
所以我做到了:
aggregated = dataframe.groupby("h3",
functions.window(
timeColumn="ts",
windowDuration="3 days",
slideDuration="1 day",
)
).agg(
functions.map_from_entries(
functions.collect_list(
functions.struct("id","feature")
)
).alias("id_feature")
)
aggregated.show(truncate=False)
但后来我发现地图可以有重复的键:
+---+------------------------------------------+--------------------------------+
|h3 |window |id_feature |
+---+------------------------------------------+--------------------------------+
|1 |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|[1 -> 1, 2 -> 2, 3 -> 1, 2 -> 1]|
|1 |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|[1 -> 1, 2 -> 2, 3 -> 1, 2 -> 1]|
|1 |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|[1 -> 1, 2 -> 2] |
|1 |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|[3 -> 1, 2 -> 1] |
+---+------------------------------------------+--------------------------------+
所以它不能解决我的问题。相反,我刚刚发现了另一个问题。在 Databricks 的 notebook 中使用显示功能时,it shows the MapType column without duplicated keys。
【问题讨论】:
标签: dataframe apache-spark pyspark apache-spark-sql databricks