【问题标题】:PySpark windowing over datetimes and including windows containing no rows in the resultsPySpark 对日期时间进行窗口化,并包括结果中不包含任何行的窗口
【发布时间】:2021-10-09 18:13:09
【问题描述】:

我正在尝试通过每 30 秒获取一次总和来汇总我的数据。我想知道这个聚合的结果是否为零,如果在那个 30s 区域中没有行,就会发生这种情况。

这是一个最小的工作示例,说明了我想要使用 pandas 的结果,以及它在 pyspark 中的不足之处。

输入数据

import pandas as pd
from pyspark.sql import functions as F

df = pd.DataFrame(
    [
        (17, "2017-03-10T15:27:18+00:00"),
        (13, "2017-03-10T15:27:29+00:00"),
        (25, "2017-03-10T15:27:30+00:00"),
        (101, "2017-03-10T15:29:00+00:00"),
        (99, "2017-03-10T15:29:29+00:00")
    ],
    columns=["dollars", "timestamp"],
)
df["timestamp"] = pd.to_datetime(df["timestamp"])
print(df)
    dollars timestamp
0   17  2017-03-10 15:27:18+00:00
1   13  2017-03-10 15:27:29+00:00
2   25  2017-03-10 15:27:30+00:00
3   101 2017-03-10 15:29:00+00:00
4   99  2017-03-10 15:29:29+00:00

熊猫解决方案

使用 pandas,我们可以使用 resample 来聚合每 30 秒的窗口,然后在这些窗口上应用 sum 函数(注意 2017-03-10 15:28:00+00:002017-03-10 15:28:30+00:00 的结果):

desired_result = df.set_index("timestamp").resample("30S").sum()
desired_result
                            dollars
timestamp   
2017-03-10 15:27:00+00:00   30
2017-03-10 15:27:30+00:00   25
2017-03-10 15:28:00+00:00   0
2017-03-10 15:28:30+00:00   0
2017-03-10 15:29:00+00:00   200

PySpark 接近解决方案

在 pyspark 中,我们可以使用 pyspark.sql.functions.window 每 30 秒进行一次窗口化(改编,感谢 this stack answer),但这会错过没有行的窗口:

spark: pyspark.sql.session.SparkSession  # I expect you to have set up your session...
sdf = spark.createDataFrame(df)
sdf.groupby(
    F.window("timestamp", windowDuration="30 seconds", slideDuration="30 seconds")
).agg(F.sum("dollars")).display()
window,sum(dollars)
"{""start"":""2017-03-10T15:27:30.000+0000"",""end"":""2017-03-10T15:28:00.000+0000""}",25
"{""start"":""2017-03-10T15:27:00.000+0000"",""end"":""2017-03-10T15:27:30.000+0000""}",30
"{""start"":""2017-03-10T15:29:00.000+0000"",""end"":""2017-03-10T15:29:30.000+0000""}",200

问题

如何让 pyspark 返回没有行的时间窗口的窗口结果(如 pandas)?

【问题讨论】:

    标签: python pandas dataframe apache-spark pyspark


    【解决方案1】:

    您可以使用时间戳算法as mentioned in this answer(我建议您在他详细介绍时查看它)。在您的情况下,它将是:

    from pyspark.sql import functions as F
    
    seconds = 30
    epoch = (F.col("timestamp").cast("timestamp").cast("bigint") / seconds).cast(
        "bigint"
    ) * seconds
    df = spark.createDataFrame(
        [
            (17, "2017-03-10T15:27:18+00:00"),
            (13, "2017-03-10T15:27:29+00:00"),
            (25, "2017-03-10T15:27:30+00:00"),
            (101, "2017-03-10T15:29:00+00:00"),
            (99, "2017-03-10T15:29:29+00:00"),
        ],
        ["dollars", "timestamp"],
    ).withColumn("epoch", epoch)
    
    min_epoch, max_epoch = df.select(F.min("epoch"), F.max("epoch")).first()
    
    ref = spark.range(min_epoch, max_epoch + seconds, seconds).toDF("epoch")
    
    (
        ref.join(df, "epoch", "left")
        .withColumn("ts_resampled", F.timestamp_seconds("epoch"))
        .groupBy("ts_resampled")
        .sum("dollars")
        .orderBy("ts_resampled")
        .fillna(0, subset=["sum(dollars)"])
        .show(truncate=False)
    )
    

    输出

    |ts_resampled       |sum(dollars)|
    +-------------------+------------+
    |2017-03-10 12:27:00|30          |
    |2017-03-10 12:27:30|25          |
    |2017-03-10 12:28:00|0           |
    |2017-03-10 12:28:30|0           |
    |2017-03-10 12:29:00|200         |
    +-------------------+------------+
    

    【讨论】:

    • 仅供参考,timestamp_seconds 是 spark 3.1.0+
    【解决方案2】:

    与 Ottovon 相同的解决方案,但在 Spark2.4 中并且没有操作 first

    from pyspark.sql import functions as F
    
    seconds = 30
    epoch = (F.col("timestamp").cast("timestamp").cast("bigint") / seconds).cast(
        "bigint"
    ) * seconds
    df = spark.createDataFrame(
        [
            (17, "2017-03-10T15:27:18+00:00"),
            (13, "2017-03-10T15:27:29+00:00"),
            (25, "2017-03-10T15:27:30+00:00"),
            (101, "2017-03-10T15:29:00+00:00"),
            (99, "2017-03-10T15:29:29+00:00"),
        ],
        ["dollars", "timestamp"],
    ).withColumn("epoch", epoch)
    
    ref = df.select(
        F.min("epoch").alias("min_epoch"), F.max("epoch").alias("max_epoch")
    ).select(
        F.explode(F.sequence("min_epoch", "max_epoch", F.lit(seconds))).alias("epoch")
    )
    
    ref.show()
    +----------+                                                                    
    |     epoch|
    +----------+
    |1489159620|
    |1489159650|
    |1489159680|
    |1489159710|
    |1489159740|
    +----------+
    
    resampled_df = (
        ref.join(df, "epoch", "left")
        .withColumn("ts_resampled", F.from_unixtime("epoch"))
        .groupBy("ts_resampled")
        .agg(F.coalesce(F.sum("dollars"), F.lit(0)).alias("dollars"))
        .orderBy("ts_resampled")
    )
    
    resampled_df.show()
    +-------------------+-------+                                                   
    |       ts_resampled|dollars|
    +-------------------+-------+
    |2017-03-10 15:27:00|     30|
    |2017-03-10 15:27:30|     25|
    |2017-03-10 15:28:00|      0|
    |2017-03-10 15:28:30|      0|
    |2017-03-10 15:29:00|    200|
    +-------------------+-------+
    

    【讨论】:

    • 在纪元中,您只是将日期转换为纪元,不是吗?我尝试了只有日期的列,似乎效果不太好,我认为最好使用epoch = F.unix_timestamp(F.col('date'),"yyyy-MM-dd"),这样更容易理解,您可以定义格式。
    猜你喜欢
    • 2022-12-18
    • 1970-01-01
    • 2022-01-16
    • 2011-06-02
    • 2016-02-24
    • 2016-09-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多