【发布时间】:2022-08-16 13:04:06
【问题描述】:
我的目标是计算另一列,保持与原始 DataFrame 相同的行数,我可以在其中显示过去 30 天每个用户的平均余额。
我想它可以使用窗口函数来完成,按用户分区并以某种方式限制当前日期和 30 天前之间的行,但我不知道如何在 PySpark 中实现它。
我有以下 Spark DataFrame:
| userId | date | balance |
|---|---|---|
| A | 09/06/2020 | 100 |
| A | 03/07/2020 | 200 |
| A | 05/08/2020 | 600 |
| A | 30/08/2020 | 1000 |
| A | 15/09/2020 | 500 |
| B | 03/01/2020 | 100 |
| B | 05/04/2020 | 200 |
| B | 29/04/2020 | 600 |
| B | 01/05/2020 | 1600 |
我想要的输出 DataFrame 将是:
| userId | date | balance | mean_last_30days_balance |
|---|---|---|---|
| A | 09/06/2020 | 100 | 100 |
| A | 03/07/2020 | 200 | 150 |
| A | 05/08/2020 | 600 | 600 |
| A | 30/08/2020 | 1000 | 800 |
| A | 15/09/2020 | 500 | 750 |
| B | 03/01/2020 | 100 | 100 |
| B | 05/04/2020 | 200 | 200 |
| B | 29/04/2020 | 600 | 400 |
| B | 01/05/2020 | 1600 | 800 |
from datetime import datetime
from pyspark.sql import types as T
data = [(\"A\",datetime.strptime(\"09/06/2020\",\'%d/%m/%Y\'),100),
(\"A\",datetime.strptime(\"03/07/2020\",\'%d/%m/%Y\'),200),
(\"A\",datetime.strptime(\"05/08/2020\",\'%d/%m/%Y\'),600),
(\"A\",datetime.strptime(\"30/08/2020\",\'%d/%m/%Y\'),1000),
(\"A\",datetime.strptime(\"15/09/2020\",\'%d/%m/%Y\'),500),
(\"B\",datetime.strptime(\"03/01/2020\",\'%d/%m/%Y\'),100),
(\"B\",datetime.strptime(\"05/04/2020\",\'%d/%m/%Y\'),200),
(\"B\",datetime.strptime(\"29/04/2020\",\'%d/%m/%Y\'),600),
(\"B\",datetime.strptime(\"01/05/2020\",\'%d/%m/%Y\'),1600)]
schema = T.StructType([T.StructField(\"userId\",T.StringType(),True),
T.StructField(\"date\",T.DateType(),True),
T.StructField(\"balance\",T.StringType(),True)
])
sdf_prueba = spark.createDataFrame(data=data,schema=schema)
sdf_prueba.printSchema()
sdf_prueba.orderBy(F.col(\'userId\').asc(),F.col(\'date\').asc()).show(truncate=False)
标签: pyspark apache-spark-sql window-functions