【问题标题】:Resampling PySpark dataframe from months to weeks将 PySpark 数据帧从数月重采样到数周
【发布时间】:2019-09-18 11:45:11
【问题描述】:

输入 pyspark 数据帧每个 key_iddate_month 都有一行。对于一个随机的key_id,它看起来像这样

+--------+-------------+---------+---------+
| key_id | date_month  | value_1 | value_2 |
+--------+-------------+---------+---------+
|      1 | 2019-02-01  |   1.135 | 'a'     |
|      1 | 2019-03-01  |   0.165 | 'b'     |
|      1 | 2019-04-01  |     0.0 | null    |
+--------+-------------+---------+---------+

需要重新采样到每周粒度才能看起来像这样

+--------+-------------+---------+---------+
| key_id |  date_week  | value_1 | value_2 |
+--------+-------------+---------+---------+
|      1 | 2019-02-04  |   1.135 | 'a'     |
|      1 | 2019-02-11  |   1.135 | 'a'     |
|      1 | 2019-02-18  |   1.135 | 'a'     |
|      1 | 2019-02-25  |   1.135 | 'a'     |
|      1 | 2019-03-04  |   0.165 | 'b'     |
|      1 | 2019-03-11  |   0.165 | 'b'     |
|      1 | 2019-03-18  |   0.165 | 'b'     |
|      1 | 2019-03-25  |   0.165 | 'b'     |
|      1 | 2019-04-01  |     0.0 | null    |
|      1 | 2019-04-08  |     0.0 | null    |
|      1 | 2019-04-15  |     0.0 | null    |
|      1 | 2019-04-22  |     0.0 | null    |
|      1 | 2019-04-29  |     0.0 | null    |
+--------+-------------+---------+---------+

目前在 PySpark 数据帧和 Pandas 之间切换大约需要 30 行代码:争论日期范围、连接等。

有没有办法在 PySpark 中以直接的方式做到这一点?

我尝试了Pandas resampling from months to weeks,但是当我的“主键”是date_monthkey_id 的组合时,我不知道如何使它工作。

目前初始数据帧中的行数很少,约为 250K,我猜,转换 PySpark 数据帧 toPandas(),然后在 Pandas 中进行转换是一个可行的选择。

【问题讨论】:

    标签: python pandas dataframe pyspark time-series


    【解决方案1】:

    以下解决方案涉及制作几个月到几周的映射器(其中几周是每月的星期一),并将其加入到您的原始数据中。

    模拟数据的无聊部分:

    ## Replicate data with join trick to get out nulls
    ## Convert string to date format
    
    import pyspark.sql.functions as F
    
    c = ['key_id','date_month','value_1']
    d = [(1,'2019-02-01',1.135),
            (1,'2019-03-01',0.165),
            (1,'2019-04-01',0.0)]
    
    c2 = ['date_month','value_2']
    d2 = [('2019-02-01','a'),
          ('2019-03-01','b')]
    
    df = spark.createDataFrame(d,c)
    df2 = spark.createDataFrame(d2,c2)
    
    test_df = df.join(df2, how = 'left', on = 'date_month')
    
    test_df_date = test_df.withColumn('date_month', F.to_date(test_df['date_month']))
    
    test_df_date.orderBy('date_month').show() 
    

    您的数据:

    +----------+------+-------+-------+
    |date_month|key_id|value_1|value_2|
    +----------+------+-------+-------+
    |2019-02-01|     1|  1.135|      a|
    |2019-03-01|     1|  0.165|      b|
    |2019-04-01|     1|    0.0|   null|
    +----------+------+-------+-------+
    

    使用来自get all the dates between two dates in Spark DataFrame 的巧妙技巧构建映射器

    以一个月的映射器结束,到本月的一周开始(您可以直接对原始数据执行此操作,而不是创建映射器。)

    ## Build month to week mapper
    
    ## Get first and last of each month, and number of days between
    months = test_df_date.select('date_month').distinct()
    months = months.withColumn('date_month_end', F.last_day(F.col('date_month')))
    months = months.withColumn('days', F.datediff(F.col('date_month_end'), 
                                                  F.col('date_month')))
    
    ## Use trick from https://stackoverflow.com/questions/51745007/get-all-the-dates-between-two-dates-in-spark-dataframe 
    ## Adds a column 'day_in_month' with all days in the month from first to last. 
    ## 
    months = months.withColumn("repeat", F.expr("split(repeat(',', days), ',')"))\
        .select("*", F.posexplode("repeat").alias("day_in_month", "val"))\
        .drop("repeat", "val", "days")\
        .withColumn("day_in_month", F.expr("date_add(date_month, day_in_month)"))\
    
    ## Add integer day of week value - Sunday == 1, Monday == 2,
    ## Filter by mondays,
    ## Rename and drop columns 
    months = months.withColumn('day', F.dayofweek(F.col('day_in_month')))
    months = months.filter(F.col('day') == 2)
    month_week_mapper = months.withColumnRenamed('day_in_month', 'date_week')\
        .drop('day', 'date_month_end')
    
    month_week_mapper.orderBy('date_week').show()
    

    映射器如下:

    +----------+----------+
    |date_month| date_week|
    +----------+----------+
    |2019-02-01|2019-02-04|
    |2019-02-01|2019-02-11|
    |2019-02-01|2019-02-18|
    |2019-02-01|2019-02-25|
    |2019-03-01|2019-03-04|
    |2019-03-01|2019-03-11|
    |2019-03-01|2019-03-18|
    |2019-03-01|2019-03-25|
    |2019-04-01|2019-04-01|
    |2019-04-01|2019-04-08|
    |2019-04-01|2019-04-15|
    |2019-04-01|2019-04-22|
    |2019-04-01|2019-04-29|
    +----------+----------+
    

    然后我们对原始数据执行左连接,每个月都会连接到各自的每个星期。最后一行只是删除多余的列,并重新排序行/列以匹配您想要的输出。

    ## Perform the join, and do some cleanup to get results into order/format specified above. 
    out_df = test_df_date.join(month_week_mapper, on = 'date_month', how = 'left')
    
    out_df.drop('date_month')\
        .select('key_id','date_week','value_1','value_2')\
        .orderBy('date_week')\
        .show()
    
    ## Gives me an output of:
    +------+----------+-------+-------+
    |key_id| date_week|value_1|value_2|
    +------+----------+-------+-------+
    |     1|2019-02-04|  1.135|      a|
    |     1|2019-02-11|  1.135|      a|
    |     1|2019-02-18|  1.135|      a|
    |     1|2019-02-25|  1.135|      a|
    |     1|2019-03-04|  0.165|      b|
    |     1|2019-03-11|  0.165|      b|
    |     1|2019-03-18|  0.165|      b|
    |     1|2019-03-25|  0.165|      b|
    |     1|2019-04-01|    0.0|   null|
    |     1|2019-04-08|    0.0|   null|
    |     1|2019-04-15|    0.0|   null|
    |     1|2019-04-22|    0.0|   null|
    |     1|2019-04-29|    0.0|   null|
    +------+----------+-------+-------+
    
    

    这应该适用于您的 KeyID 列,但您需要使用一些稍微不同的数据对其进行测试才能确定。

    我肯定会提倡像上面那样做,而不是转换为 Pandas 并再次返回。 df.toPandas 非常慢,如果你的数据量随着时间的推移而增加,Pandas 方法有时会失败,你(或维护代码的人)无论如何都会遇到这个问题。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-09-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-12-28
      • 2017-09-19
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多