【问题标题】:Forward Fill New Row to Account for Missing Dates向前填充新行以说明缺少的日期
【发布时间】:2019-03-26 15:56:03
【问题描述】:

我目前有一个由变量“聚合器”按小时增量分组的数据集。这个每小时数据存在差距,我最理想的做法是用映射到 x 列中的变量的前一行向前填充行。

我已经看到使用 PANDAS 解决类似问题的一些解决方案,但理想情况下,我想了解如何最好地使用 pyspark UDF 解决这个问题。

我最初考虑使用 PANDAS 进行以下操作,但也很难实现这一点,只是忽略聚合器作为第一遍:

df = df.set_index(keys=[df.timestamp]).resample('1H', fill_method='ffill')

但理想情况下,我想避免使用 PANDAS。

在下面的示例中,我缺少两行每小时数据(标记为 MISSING)。

| timestamp            | aggregator |
|----------------------|------------|
| 2018-12-27T09:00:00Z | A          |
| 2018-12-27T10:00:00Z | A          |
| MISSING              | MISSING    |
| 2018-12-27T12:00:00Z | A          |
| 2018-12-27T13:00:00Z | A          |
| 2018-12-27T09:00:00Z | B          |
| 2018-12-27T10:00:00Z | B          |
| 2018-12-27T11:00:00Z | B          |
| MISSING              | MISSING    |
| 2018-12-27T13:00:00Z | B          |
| 2018-12-27T14:00:00Z | B          |

这里的预期输出如下:

| timestamp            | aggregator |
|----------------------|------------|
| 2018-12-27T09:00:00Z | A          |
| 2018-12-27T10:00:00Z | A          |
| 2018-12-27T11:00:00Z | A          |
| 2018-12-27T12:00:00Z | A          |
| 2018-12-27T13:00:00Z | A          |
| 2018-12-27T09:00:00Z | B          |
| 2018-12-27T10:00:00Z | B          |
| 2018-12-27T11:00:00Z | B          |
| 2018-12-27T12:00:00Z | B          |
| 2018-12-27T13:00:00Z | B          |
| 2018-12-27T14:00:00Z | B          |

感谢您的帮助。

谢谢。

【问题讨论】:

    标签: pyspark pyspark-sql


    【解决方案1】:

    这是填补缺失时间的解决方案。使用 windows、lag 和 udf。只需稍加修改,它也可以延长至数天。

    from pyspark.sql.window import Window
    from pyspark.sql.types import *
    from pyspark.sql.functions import *
    from dateutil.relativedelta import relativedelta
    
    def missing_hours(t1, t2):
        return [t1 + relativedelta(hours=-x) for x in range(1, t1.hour-t2.hour)]
    
    missing_hours_udf = udf(missing_hours, ArrayType(TimestampType()))
    
    df = spark.read.csv('dates.csv',header=True,inferSchema=True)
    
    window = Window.partitionBy("aggregator").orderBy("timestamp")
    
    df_mising = df.withColumn("prev_timestamp",lag(col("timestamp"),1, None).over(window))\
           .filter(col("prev_timestamp").isNotNull())\
           .withColumn("timestamp", explode(missing_hours_udf(col("timestamp"), col("prev_timestamp"))))\
           .drop("prev_timestamp")
    
    df.union(df_mising).orderBy("aggregator","timestamp").show()
    

    结果

    +-------------------+----------+
    |          timestamp|aggregator|
    +-------------------+----------+
    |2018-12-27 09:00:00|         A|
    |2018-12-27 10:00:00|         A|
    |2018-12-27 11:00:00|         A|
    |2018-12-27 12:00:00|         A|
    |2018-12-27 13:00:00|         A|
    |2018-12-27 09:00:00|         B|
    |2018-12-27 10:00:00|         B|
    |2018-12-27 11:00:00|         B|
    |2018-12-27 12:00:00|         B|
    |2018-12-27 13:00:00|         B|
    |2018-12-27 14:00:00|         B|
    +-------------------+----------+
    

    【讨论】:

    • 您好,谢谢,效果很好。只需对其进行调整以处理日期的交叉,但这是对 range(1, t1.hour-t2.hour) 元素的简单修复。
    猜你喜欢
    • 2017-12-08
    • 2022-12-21
    • 2020-12-07
    • 2023-04-03
    • 2017-09-20
    • 1970-01-01
    • 2017-12-22
    • 1970-01-01
    • 2020-12-04
    相关资源
    最近更新 更多