【问题标题】:Compare two dataframes row wise using udf (using library whoswho)使用 udf 逐行比较两个数据帧(使用库 whoswho)
【发布时间】:2021-11-29 22:38:52
【问题描述】:

我有两个带有名称的 Spark 数据帧,并希望使用 Python 库 [whoswho][1] 对这些行进行比较

如何创建具有逐行比较的 UDF? (或使用whoswho 库进行比较的更好方法)

df1 = spark.createDataFrame([
 ["Luc Krier"],
 ["Jeanny Thorn"],
 ["Teddy E Beecher"],
 ["Philippe Schauss"],
 ["Meindert I Tholen"],
 ["John I Muller"]
]).toDF("name")

df2 = spark.createDataFrame([
 ["J. Thorn"],
 ["Ben Weller"],
 ["L. Krier"],
 ["J.M. Thorn"],
 ["Liam Muller"],
 ["Meindert Tholen"]
]).toDF("name")

使用 whoswho 的示例:

from whoswho import who
who.match('Luc Krier', 'L. Krier')

【问题讨论】:

    标签: python pyspark user-defined-functions string-comparison


    【解决方案1】:

    如果您只打算过滤来自whoswhoTrue 结果

    您可以交叉连接数据框的行并使用who.match 创建一个UDF,最后仅过滤True 结果

    数据准备

    df1 = sql.createDataFrame([
     ["Luc Krier"],
     ["Jeanny Thorn"],
     [ "Teddy E Beecher"],
     ["Philippe Schauss"],
     ["Meindert I Tholen"],
     ["John I Muller"]
    ]).toDF("name")
    
    df2 = sql.createDataFrame([
     ["J. Thorn"],
     ["Ben Weller"],
     [ "L. Krier"],
     ["J.M. Thorn"],
     ["Liam Muller"],
     ["Meindert Tholen"]
    ]).toDF("name")
    
    
    df1 = df1.withColumn('id',F.lit(1))
    df2 = df2.withColumn('id',F.lit(1))
    
    combined_df = df1.join(df2
                          ,df1['id'] == df2['id']
                          ).select(df1['name'],df2['name'].alias('name_proxy'))
    
    #### 20 records
    combined_df.show()
    
    +----------------+---------------+
    |            name|     name_proxy|
    +----------------+---------------+
    |       Luc Krier|       J. Thorn|
    |       Luc Krier|     Ben Weller|
    |       Luc Krier|       L. Krier|
    |    Jeanny Thorn|       J. Thorn|
    |    Jeanny Thorn|     Ben Weller|
    |    Jeanny Thorn|       L. Krier|
    | Teddy E Beecher|       J. Thorn|
    | Teddy E Beecher|     Ben Weller|
    | Teddy E Beecher|       L. Krier|
    |       Luc Krier|     J.M. Thorn|
    |       Luc Krier|    Liam Muller|
    |       Luc Krier|Meindert Tholen|
    |    Jeanny Thorn|     J.M. Thorn|
    |    Jeanny Thorn|    Liam Muller|
    |    Jeanny Thorn|Meindert Tholen|
    | Teddy E Beecher|     J.M. Thorn|
    | Teddy E Beecher|    Liam Muller|
    | Teddy E Beecher|Meindert Tholen|
    |Philippe Schauss|       J. Thorn|
    |Philippe Schauss|     Ben Weller|
    +----------------+---------------+
    

    熊猫 UDF

    from whoswho import who
    from functools import partial,reduce
    
    schema = StructType([
               StructField('name', StringType(), True),
               StructField('name_proxy', StringType(), True),
               StructField('match', BooleanType(), True)
     ])
    
    def whos_who_match(inp_df,match_columns):
        
        inp_df['match'] = inp_df[match_columns].apply(lambda x : who.match(x[0],x[1]),axis=1)
        
        return inp_df
    
    partial_func = partial(whos_who_match,match_columns=['name','name_proxy'])
    
    combined_df = combined_df.groupby('name').applyInPandas(partial_func,schema)
    
    combined_df.filter(F.col('match') == True).show()
    
    +-----------------+---------------+-----+
    |             name|     name_proxy|match|
    +-----------------+---------------+-----+
    |Meindert I Tholen|Meindert Tholen| true|
    |     Jeanny Thorn|       J. Thorn| true|
    |        Luc Krier|       L. Krier| true|
    +-----------------+---------------+-----+
    
    

    UDF

    from whoswho import who
    
    @F.udf(BooleanType())
    def whos_who_match(x,y):
        return who.match(x,y)
    
    combined_df = combined_df.withColumn('match',whos_who_match(F.col('name'),F.col('name_proxy')))
    #### 20 records
    combined_df.show()
    
    +----------------+---------------+-----+
    |            name|     name_proxy|match|
    +----------------+---------------+-----+
    |       Luc Krier|       J. Thorn|false|
    |       Luc Krier|     Ben Weller|false|
    |       Luc Krier|       L. Krier| true|
    |    Jeanny Thorn|       J. Thorn| true|
    |    Jeanny Thorn|     Ben Weller|false|
    |    Jeanny Thorn|       L. Krier|false|
    | Teddy E Beecher|       J. Thorn|false|
    | Teddy E Beecher|     Ben Weller|false|
    | Teddy E Beecher|       L. Krier|false|
    |       Luc Krier|     J.M. Thorn|false|
    |       Luc Krier|    Liam Muller|false|
    |       Luc Krier|Meindert Tholen|false|
    |    Jeanny Thorn|     J.M. Thorn|false|
    |    Jeanny Thorn|    Liam Muller|false|
    |    Jeanny Thorn|Meindert Tholen|false|
    | Teddy E Beecher|     J.M. Thorn|false|
    | Teddy E Beecher|    Liam Muller|false|
    | Teddy E Beecher|Meindert Tholen|false|
    |Philippe Schauss|       J. Thorn|false|
    |Philippe Schauss|     Ben Weller|false|
    +----------------+---------------+-----+
    

    过滤器

    combined_df.filter(F.col('match') == True).show()
    
    +-----------------+---------------+-----+
    |             name|     name_proxy|match|
    +-----------------+---------------+-----+
    |        Luc Krier|       L. Krier| true|
    |     Jeanny Thorn|       J. Thorn| true|
    |Meindert I Tholen|Meindert Tholen| true|
    +-----------------+---------------+-----+
    

    【讨论】:

    • 是的,交叉连接会导致你的数据集爆炸,我在这样做之前尝试了 orderBy ,但没有运气,如果你可以基于更通用的键加入,结果数据集将显着减少
    • 不确定它会有多少好处,但是是的,cab 也将 udf 转换为 pandas udf
    • 非常感谢。我可以将udf 更改为pandas_udf,因为这很慢吗?你能告诉我一个如何将其更改为“pandas_udf”的示例吗?
    • 你能告诉我一个如何将其更改为“pandas_udf”的示例吗?
    • 更新了答案,还添加了 Pandas UDF,请让我知道时间差异,如果对您有帮助,请点赞答案
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-01-02
    • 2022-01-16
    • 2017-01-20
    • 1970-01-01
    • 1970-01-01
    • 2019-07-21
    • 2020-06-05
    相关资源
    最近更新 更多