【问题标题】:PySpark new column that selects value from a list of integers从整数列表中选择值的 PySpark 新列
【发布时间】:2019-10-28 02:58:16
【问题描述】:

我在 PySpark 中有以下数据框:

|ID    |YearBLT|MinYear|MaxYear|ADP_Range               |
---------------------------------------------------------
|164876|2010   |2004   |2009   |[2004,2009]             |
|164877|2008   |2000   |2011   |[2000, 2002, 2011]      |
|164878|2000   |2003   |2011   |[2003, 2011]            |
|164879|2013   |1999   |2015   |[2003, 2007, 2015, 1999]|

YearBLT 是房产的建造年份,ADP_Range 代表建筑规范更新的年份,MinYear/MaxYear 代表 ADP 范围的最短和最长年份。

我正在尝试添加一列 (ADP_Year),该列具有最适用的构建代码,其逻辑如下:

  • 如果 YearBLT 小于 MinYear,ADP_Year == "NA"
  • 如果 YearBLT 大于 MaxYear,ADP_Year == Max(ADP_Range)
  • 如果 YearBLT 介于两者之间,则选择 ADP_Range 中 YearBLT 下方最接近的日期

预期的输出如下:

|ID    |YearBLT|MinYear|MaxYear|ADP_Range               |ADP_Year|
------------------------------------------------------------------
|164876|2010   |2004   |2009   |[2004,2009]             |2009    |
|164877|2008   |2000   |2011   |[2000, 2002, 2011]      |2002    |
|164878|2000   |2003   |2011   |[2003, 2011]            |NA      |
|164879|2013   |1999   |2015   |[2003, 2007, 2015, 1999]|2007    |

2010 > MaxYear,所以它从 MaxYear 中选择值,

2008 年介于 2000 年和 2011 年之间;因为有第三个值 2002,所以选择它是因为它比 2000 年更新

2000

2013 年介于 1999 年和 2015 年之间;由于 2007 年和 2015 年有第三和第四个值,因此选择 2007 年

前两种情况很简单,我有它们的工作代码:

dfADP = dfADP.withColumn("ADP_Year",when(dfADP['YearBLT'] < dfADP['MinYear'], lit("NA")\
.when(dfADP['YearBLT'] > dfADP['MaxYear'],dfADP['MaxYear'])))

我正在为此旋转我的轮子,如果这可能的话,我希望得到一些建议。

【问题讨论】:

    标签: python apache-spark pyspark


    【解决方案1】:

    首先让我们找到范围的最大值

    from pyspark.sql.functions import array_max, col, expr, when
    
    max_adp_range = array_max("ADP_Range")
    

    和最接近的值:

    closest_adp_range = array_max(expr("""
        filter(ADP_Range, y -> y < YearBLT)
    """))
    

    并将这两个组合成一个表达式:

    adp_year = when(
        # If the YearBLT is greater than the MaxYear, ADP_Year == Max(ADP_Range)
        col("YearBLT") > col("MaxYear"), max_adp_range
    ).when(
        # If the YearBLT is in between, it chooses 
        # the closest date below the YearBLT in the ADP_Range
        col("YearBLT").between(col("MinYear"), col("MaxYear")), closest_adp_range
    ).otherwise(
       # If the YearBLT is less than the MinYear, ADP_Year == "NA"
       # Note: not required. Included just for clarity.
       None
    )
    

    最终选择:

    df = spark.createDataFrame([                                    
        (164876, 2010, 2004, 2009, [2004,2009]),
        (164877, 2008, 2000, 2011, [2000, 2002, 2011]),   
        (164878, 2000, 2003, 2011, [2003, 2011]),         
        (164879, 2013, 1999, 2015, [2003, 2007, 2015, 1999])
    ], ("id", "YearBLT", "MinYear", "MaxYear", "ADP_Range"))
    
    df.withColumn("ADP_YEAR", adp_year).show()
    

    应该会给出预期的结果:

    +------+-------+-------+-------+--------------------+--------+
    |    id|YearBLT|MinYear|MaxYear|           ADP_Range|ADP_YEAR|
    +------+-------+-------+-------+--------------------+--------+
    |164876|   2010|   2004|   2009|        [2004, 2009]|    2009|
    |164877|   2008|   2000|   2011|  [2000, 2002, 2011]|    2002|
    |164878|   2000|   2003|   2011|        [2003, 2011]|    null|
    |164879|   2013|   1999|   2015|[2003, 2007, 2015...|    2007|
    +------+-------+-------+-------+--------------------+--------+
    

    array_maxfilter 高阶函数都需要 Spark 2.4 或更高版本。在 2.3 或之前,您可以将上述表达式重新定义为

    from pyspark.sql.functions import udf
    
    max_adp_range = udf(max, "bigint")("ADP_Range")
    closest_adp_range = udf(
        lambda xs, y: max(x for x in xs if x < y), "bigint"
    )("ADP_Range", "YearBLT")
    

    但您应该预料到会显着降低性能(单个 udf 应该更快,但仍比原生表达式慢)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-11-25
      • 2016-07-10
      • 2014-03-13
      • 2022-11-29
      • 2020-08-18
      • 2021-10-13
      • 1970-01-01
      相关资源
      最近更新 更多