【问题标题】:Add column to Spark dataframe with the max value that is less than the current record's value将列添加到 Spark 数据帧,其最大值小于当前记录的值
【发布时间】:2021-11-27 16:15:27
【问题描述】:

我有一个类似于以下的 Spark 数据框:

id  claim_id                 service_date                  status   product
123 10606134411906233408    2018-09-17T00:00:00.000+0000    PD      blue
123 10606147900401009928    2019-01-24T00:00:00.000+0000    PD      yellow
123 10606160940704723994    2019-05-23T00:00:00.000+0000    RV      yellow
123 10606171648203079553    2019-08-29T00:00:00.000+0000    RJ      blue
123 10606186611407311724    2020-01-13T00:00:00.000+0000    PD      blue

请原谅我没有粘贴任何代码,因为没有任何效果。我想添加一个新列,其中状态为 PD 的前一行的 max(service_date) 并且当前行的乘积 = 上一行的乘积。

这很容易通过关联子查询完成,但效率不高,此外,在 Spark 中也不可行,因为不支持非 equi 连接。另请注意,LAG 将不起作用,因为我并不总是需要前一个记录(并且偏移量将是动态的)。

预期的输出将是这样的:

id  claim_id                 service_date                  status   product     previous_service_date
    123 10606134411906233408    2018-09-17T00:00:00.000+0000    PD      blue
    123 10606147900401009928    2019-01-24T00:00:00.000+0000    PD      yellow
    123 10606160940704723994    2019-05-23T00:00:00.000+0000    RV      yellow      2019-01-24T00:00:00.000+0000
    123 10606171648203079553    2019-08-29T00:00:00.000+0000    RJ      blue        2018-09-17T00:00:00.000+0000
    123 10606186611407311724    2020-01-13T00:00:00.000+0000    PD      blue        2018-09-17T00:00:00.000+0000

【问题讨论】:

    标签: python apache-spark pyspark databricks


    【解决方案1】:

    您可以尝试以下使用max 作为窗口函数和when(一个案例表达式)但专注于前面的行

    from pyspark.sql import functions as F
    from pyspark.sql import Window
    
    
    df = df.withColumn('previous_service_date',F.max(
        F.when(F.col("status")=="PD",F.col("service_date")).otherwise(None)
    ).over(
        Window.partitionBy("product")
              .rowsBetween(Window.unboundedPreceding,-1)
    ))
    
    df.orderBy('service_date').show(truncate=False)
    
    +---+--------------------+-------------------+------+-------+---------------------+
    |id |claim_id            |service_date       |status|product|previous_service_date|
    +---+--------------------+-------------------+------+-------+---------------------+
    |123|10606134411906233408|2018-09-17 00:00:00|PD    |blue   |null                 |
    |123|10606147900401009928|2019-01-24 00:00:00|PD    |yellow |null                 |
    |123|10606160940704723994|2019-05-23 00:00:00|RV    |yellow |2019-01-24 00:00:00  |
    |123|10606171648203079553|2019-08-29 00:00:00|RJ    |blue   |2018-09-17 00:00:00  |
    |123|10606186611407311724|2020-01-13 00:00:00|PD    |blue   |2018-09-17 00:00:00  |
    +---+--------------------+-------------------+------+-------+---------------------+
    

    编辑 1

    您也可以使用last,如下所示

    df = df.withColumn('previous_service_date',F.last(
        F.when(F.col("status")=="PD" ,F.col("service_date")).otherwise(None),True
    ).over(
        Window.partitionBy("product")
              .orderBy('service_date')
              .rowsBetween(Window.unboundedPreceding,-1)
    ))
    

    让我知道这是否适合你。

    【讨论】:

    • 这按预期工作:谢谢。快速跟进;我可以重构它以使其在产品上也匹配吗?换句话说,提供状态为“PD”的上一个服务日期,但也提供上一个记录的产品与当前记录的产品匹配的位置?
    • @MichaelBass 它目前在partitionBy("product") 的帮助下执行此操作。使用 `F.when(F.col("status")=="PD" ,F.col("service_date"))` 的过滤器被应用,因为唯一考虑的服务日期是那些带有PD状态。您是否还有其他结果不符合预期的测试用例/数据?
    • 对不起我之前的评论,你是对的。我在乱写代码,无意中更改了分区。这很好用,非常感谢。总有一天我会像你一样聪明。
    【解决方案2】:

    您可以将您的 DataFrame copy 转换为新的 DataFrame (df2) 和 join,如下所示:

    (df.join(df2, 
             on = [df.Service_date > df2.Service_date,
                   df.product == df2.product,
                   df2.status == 'PD'],
             how = "left"))
    

    删除重复的列并将df2.Service_date重命名为previous_service_date

    【讨论】:

    • 谢谢。这是一个不错的解决方案,但在我的场景中不起作用,因为它返回多条记录。
    猜你喜欢
    • 2017-07-18
    • 1970-01-01
    • 1970-01-01
    • 2021-03-08
    • 1970-01-01
    • 1970-01-01
    • 2013-09-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多