【问题标题】:PySpark - Getting the latest date less than another given datePySpark - 获取小于另一个给定日期的最新日期
【发布时间】:2021-08-06 13:21:50
【问题描述】:

我需要一些帮助。我有两个数据框,一个有几个日期,另一个有我的重要数据,按日期分类。

它是这样的:

第一个df,带有相关数据

+------+----------+---------------+
|    id| test_date|          score|
+------+----------+---------------+
|     1|2021-03-31|             94|
|     1|2021-01-31|             93|
|     1|2020-12-31|            100|
|     1|2020-06-30|             95|
|     1|2019-10-31|             58|
|     1|2017-10-31|             78|
|     2|2020-01-31|             79|
|     2|2018-03-31|             66|
|     2|2016-05-31|             77|
|     3|2021-05-31|             97|
|     3|2020-07-31|            100|
|     3|2019-07-31|             99|
|     3|2019-06-30|             98|
|     3|2018-07-31|             91|
|     3|2018-02-28|             86|
|     3|2017-11-30|             82|
+------+----------+---------------+

第二个df,带日期

+--------------+--------------+--------------+
|   eval_date_1|   eval_date_2|   eval_date_3|
+--------------+--------------+--------------+
|    2021-01-31|    2020-10-31|    2019-06-30|
+--------------+--------------+--------------+

需要的DF

+------+--------------+---------+--------------+---------+--------------+---------+
|    id|   eval_date_1| score_1 |   eval_date_2| score_2 |   eval_date_3| score_3 |
+------+--------------+---------+--------------+---------+--------------+---------+
|     1|    2021-01-31|       93|    2020-10-31|       95|    2019-06-30|       78|
|     2|    2021-01-31|       79|    2020-10-31|       79|    2019-06-30|       66|
|     3|    2021-01-31|      100|    2020-10-31|      100|    2019-06-30|       98|
+------+--------------+---------+--------------+---------+--------------+---------+

因此,例如,对于第一个 id,所需的 df 从第一个 df 的第二、第四和第六行获取分数。这些是在第二个 df 上保持等于或低于 eval_date 的最新日期。

【问题讨论】:

    标签: pyspark


    【解决方案1】:

    假设 df 是您的主要数据框,而 df_date 是仅包含日期的数据框。

    from functools import reduce
    
    from pyspark.sql import functions as F, Window as W
    
    
    df_final = reduce(
        lambda a, b: a.join(b, on="id"),
        (
            df.join(
                F.broadcast(df_date.select(f"eval_date_{i}")),
                on=F.col(f"eval_date_{i}") >= F.col("test_date"),
            )
            .withColumn(
                "rnk",
                F.row_number().over(W.partitionBy("id").orderBy(F.col("test_date").desc())),
            )
            .where("rnk=1")
            .select("id", f"eval_date_{i}", "score")
            for i in range(1, 4)
        ),
    )
    
    df_final.show()
    +---+-----------+-----+-----------+-----+-----------+-----+                     
    | id|eval_date_1|score|eval_date_2|score|eval_date_3|score|
    +---+-----------+-----+-----------+-----+-----------+-----+
    |  1| 2021-01-31|   93| 2020-10-31|   95| 2019-06-30|   78|
    |  3| 2021-01-31|  100| 2020-10-31|  100| 2019-06-30|   98|
    |  2| 2021-01-31|   79| 2020-10-31|   79| 2019-06-30|   66|
    +---+-----------+-----+-----------+-----+-----------+-----+
    

    【讨论】:

    • 谢谢,成功了!为了清楚起见,如果其他人有类似的问题,您的解决方案称为辅助 df df_date,但在编码中您将其称为 df_score
    猜你喜欢
    • 1970-01-01
    • 2013-06-24
    • 1970-01-01
    • 1970-01-01
    • 2018-06-28
    • 2019-09-13
    • 1970-01-01
    • 2017-11-04
    • 1970-01-01
    相关资源
    最近更新 更多