【问题标题】:PySpark - Creating Specific Time Series Range for GroupsPySpark - 为组创建特定的时间序列范围
【发布时间】:2021-07-23 16:46:30
【问题描述】:

我有 1000 个 Tags 有一个 Timestamp 和一个 Value。对于每个Tags,日期范围是“2020-01-01”,但是对于每个标签来说,数据太多了。我有一个单独的数据框,第一个数据框中的每个标签都有一个 StartEnd

我只需要 1000 个标签数据中的上述日期范围内的数据。我还需要在 Start 日期前 2 天和 End 日期后 1 天填充所需数据框中的时间序列数据。

df1 = spark.createDataFrame(
    [("Tag 1", "2020-05-01", 1), ("Tag 1000", "2021-02-01", 1),
        ("Tag 1", "2020-05-02", 2), ("Tag 1000", "2021-02-02", 2),
        ("Tag 1", "2020-05-03", 3), ("Tag 1000", "2021-02-03", 3),
        ("Tag 1", "2020-05-04", 4), ("Tag 1000", "2021-02-04", 4),
        ("Tag 1", "2020-05-05", 5), ("Tag 1000", "2021-02-05", 5),
        ("Tag 1", "2020-05-06", 6), ("Tag 1000", "2021-02-06", 6)],
    ["Tag", "Timestamp", "Value"])

df2 = spark.createDataFrame(
    [("Tag 1", "2020-05-02", "2020-05-03"), ("Tag 1000", "2021-02-03", "2021-02-04")],
    ["Tag", "Start", "End"])

所需的数据框:

print(df1)

Tag       Timestamp  Value
Tag 1     2020-05-01 1
Tag 1     2020-05-02 2
Tag 1     2020-05-03 3
Tag 1     2020-05-04 4       #Notice day 5 and 6 are not in the df
Tag 1000  2020-02-01 1
Tag 1000  2020-02-02 2
Tag 1000  2020-02-03 3
Tag 1000  2020-02-04 4
Tag 1000  2020-02-05 5       #Notice day 6 are not in the df

这样做只会根据第二个数据框为我提供所需的日期,并将消除 1,000,000 行不会分析的行。

到目前为止,我所理解的是创建窗口。

w = Window().partitionBy("Tag").orderBy("Timestamp")

【问题讨论】:

    标签: python apache-spark pyspark window


    【解决方案1】:

    您需要先使用 to_date 函数将列 timestampstartend 转换为 DateType,添加填充天数使用date_add,最后加入两个数据帧,其中 timestamp 列的日期在填充 startend

    之间
    from pyspark.sql.functions import col, to_date, date_add
    
    # convert to DateType
    df1 = df1.withColumn('timestamp', to_date(col('Timestamp'), "yyyy-MM-dd"))
    # convert to DateType then add padding days
    df2 = (df2
           .withColumn("start_date", date_add(to_date(col('Start'), 'yyyy-MM-dd'), -2))
           .withColumn("end_date", date_add(to_date(col("End"), 'yyyy-MM-dd'), 1)))
    
    df1 = df1.join(df2.withColumnRenamed('Tag', 'Tag2'),
                   [col('Tag') == col('Tag2'), col('timestamp').between(col('start_date'), col('end_date'))],
                   'left_semi')
    df1.show()
    
    +--------+----------+-----+
    |     Tag| timestamp|Value|
    +--------+----------+-----+
    |Tag 1000|2021-02-01|    1|
    |Tag 1000|2021-02-02|    2|
    |Tag 1000|2021-02-03|    3|
    |Tag 1000|2021-02-04|    4|
    |Tag 1000|2021-02-05|    5|
    |   Tag 1|2020-05-01|    1|
    |   Tag 1|2020-05-02|    2|
    |   Tag 1|2020-05-03|    3|
    |   Tag 1|2020-05-04|    4|
    +--------+----------+-----+
    
    

    【讨论】:

      猜你喜欢
      • 2023-03-03
      • 1970-01-01
      • 2020-04-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多