【问题标题】:Time based window function in PysparkPyspark 中基于时间的窗口函数
【发布时间】:2022-08-16 13:04:06
【问题描述】:

我的目标是计算另一列,保持与原始 DataFrame 相同的行数,我可以在其中显示过去 30 天每个用户的平均余额。

我想它可以使用窗口函数来完成,按用户分区并以某种方式限制当前日期和 30 天前之间的行,但我不知道如何在 PySpark 中实现它。

我有以下 Spark DataFrame:

userId date balance
A 09/06/2020 100
A 03/07/2020 200
A 05/08/2020 600
A 30/08/2020 1000
A 15/09/2020 500
B 03/01/2020 100
B 05/04/2020 200
B 29/04/2020 600
B 01/05/2020 1600

我想要的输出 DataFrame 将是:

userId date balance mean_last_30days_balance
A 09/06/2020 100 100
A 03/07/2020 200 150
A 05/08/2020 600 600
A 30/08/2020 1000 800
A 15/09/2020 500 750
B 03/01/2020 100 100
B 05/04/2020 200 200
B 29/04/2020 600 400
B 01/05/2020 1600 800
from datetime import datetime
from pyspark.sql import types as T

data = [(\"A\",datetime.strptime(\"09/06/2020\",\'%d/%m/%Y\'),100),
        (\"A\",datetime.strptime(\"03/07/2020\",\'%d/%m/%Y\'),200),
        (\"A\",datetime.strptime(\"05/08/2020\",\'%d/%m/%Y\'),600),
        (\"A\",datetime.strptime(\"30/08/2020\",\'%d/%m/%Y\'),1000),
        (\"A\",datetime.strptime(\"15/09/2020\",\'%d/%m/%Y\'),500),
        (\"B\",datetime.strptime(\"03/01/2020\",\'%d/%m/%Y\'),100),
        (\"B\",datetime.strptime(\"05/04/2020\",\'%d/%m/%Y\'),200),
        (\"B\",datetime.strptime(\"29/04/2020\",\'%d/%m/%Y\'),600),
        (\"B\",datetime.strptime(\"01/05/2020\",\'%d/%m/%Y\'),1600)]

schema = T.StructType([T.StructField(\"userId\",T.StringType(),True),
                       T.StructField(\"date\",T.DateType(),True), 
                       T.StructField(\"balance\",T.StringType(),True)
                      ])
 
sdf_prueba = spark.createDataFrame(data=data,schema=schema)
sdf_prueba.printSchema()
sdf_prueba.orderBy(F.col(\'userId\').asc(),F.col(\'date\').asc()).show(truncate=False)

    标签: pyspark apache-spark-sql window-functions


    【解决方案1】:

    您可以使用 RANGE BETWEEN 关键字:

    sdf_prueba.createOrReplaceTempView("table1")
    
    spark.sql(
        """SELECT *, mean(balance) OVER (
            PARTITION BY userid 
            ORDER BY CAST(date AS timestamp)  
            RANGE BETWEEN INTERVAL 30 DAYS PRECEDING AND CURRENT ROW
         ) AS mean FROM table1""").show()
    
    
    +------+----------+-------+-----+
    |userId|      date|balance| mean|
    +------+----------+-------+-----+
    |     A|2020-06-09|    100|100.0|
    |     A|2020-07-03|    200|150.0|
    |     A|2020-08-05|    600|600.0|
    |     A|2020-08-30|   1000|800.0|
    |     A|2020-09-15|    500|750.0|
    |     B|2020-01-03|    100|100.0|
    |     B|2020-04-05|    200|200.0|
    |     B|2020-04-29|    600|400.0|
    |     B|2020-05-01|   1600|800.0|
    +------+----------+-------+-----+
    

    如果你想使用pyspark API,你需要 将天数转换为 unix 秒数以使用 rangeBetween

    one_month_in_seconds = 2629743 # ?
    w = (
        Window.partitionBy("userid")
        .orderBy(unix_timestamp(col("date").cast("timestamp")))
        .rangeBetween(-one_month_in_seconds, Window.currentRow)
    )
    
    sdf_prueba.select(col("*"), mean("balance").over(w).alias("mean")).show()
    
    +------+----------+-------+-----+
    |userId|      date|balance| mean|
    +------+----------+-------+-----+
    |     A|2020-06-09|    100|100.0|
    |     A|2020-07-03|    200|150.0|
    |     A|2020-08-05|    600|600.0|
    |     A|2020-08-30|   1000|800.0|
    |     A|2020-09-15|    500|750.0|
    |     B|2020-01-03|    100|100.0|
    |     B|2020-04-05|    200|200.0|
    |     B|2020-04-29|    600|400.0|
    |     B|2020-05-01|   1600|800.0|
    +------+----------+-------+-----+
    

    【讨论】:

    • 它完美地工作。我只是有一个关于你为什么使用数字 2629743 的问题,因为 30 天 = 30*24*60*60 秒,即:2592000 秒
    • 因为它需要 30.44 天的 unix 时间戳。请问你能接受答案吗?
    • 但是你是怎么得出这个数字的?例如,如果我想要 25 天怎么办?有什么公式可以计算这个秒数吗?
    • 将其输入在线计算器,真诚地,这就是我所做的
    • @javier-monsalve,因为您从日期开始,我认为 30*24*3600 和 (31*24*3600-1) 之间的任何数字都可以...
    猜你喜欢
    • 1970-01-01
    • 2021-10-02
    • 2016-05-10
    • 2019-02-09
    • 1970-01-01
    • 2021-12-31
    • 2019-09-25
    • 2021-11-07
    • 2019-08-16
    相关资源
    最近更新 更多