【问题标题】:User Defined Function in withColumn called just once rather than per DF rowwithColumn 中的用户定义函数仅调用一次,而不是每个 DF 行
【发布时间】:2019-12-20 04:17:17
【问题描述】:

我有一个用户定义函数的问题,该函数是为连接一个数据帧中的值而构建的,该数据帧与另一个数据帧中的索引值匹配。

以下是我尝试匹配的简化数据框:

a_df:
+-------+------+
| index | name |
+-------+------+    
| 1     | aaa  |
| 2     | bbb  |
| 3     | ccc  |
| 4     | ddd  |
| 5     | eee  |
+-------+------+

b_df:
+-------+------+
| index | code |
+-------+------+    
| 1     | 101  |
| 2     | 102  |
| 3     | 101  |
| 3     | 102  |
| 4     | 103  |
| 4     | 104  |
| 5     | 101  |
+-------+------+

udf 函数和调用:

> def concatcodes(index, dataframe):
>   res = dataframe.where(dataframe.index == index).collect()
>   reslist = "|".join([value.code for value in res])
>   return reslist
> 
> spark.udf.register("concatcodes", concatcodes, StringType())
> 
> resultDF = a_DF.withColumn("codes", lit(concatcodes(a_DF.index, b_df)))

我希望每个 a_DF 数据帧的每一行都调用该函数,从而产生以下输出:

+-------+------+-------+
| index | name |codes  |
+-------+------+-------+    
| 1     | aaa  |101    |
| 2     | bbb  |102    |
| 3     | ccc  |101|102|
| 4     | ddd  |103|104|
| 5     | eee  |101    |
+-------+------+-------+

但是,该函数似乎只被调用一次,整个列作为其参数传递,导致以下输出:

+-------+------+---------------------------+
| index | name |codes                      |
+-------+------+---------------------------+    
| 1     | aaa  |101|102|101|102|103|104|101|    |
| 2     | bbb  |101|102|101|102|103|104|101|
| 3     | ccc  |101|102|101|102|103|104|101|
| 4     | ddd  |101|102|101|102|103|104|101|
| 5     | eee  |101|102|101|102|103|104|101|
+-------+------+---------------------------+

我想在 .withColum 方法中调用 UDF 时我做错了什么,但我不知道是什么 - 我非常感谢有人指出我的逻辑有什么问题。

【问题讨论】:

    标签: python apache-spark pyspark user-defined-functions databricks


    【解决方案1】:

    首先,你 don't need a udf 为这个。您问题的核心本质上是Concatenating string by rows in pysparkjoin。以下将产生所需的输出:

    from pyspark.sql.functions import collect_list, concat_ws
    
    resultDF = a_df.join(
        b_df.groupBy("index").agg(concat_ws("|", collect_list("code")).alias("code")), 
        on="index"
    )
    
    resultDF .show()
    #+-----+----+-------+
    #|index|name|   code|
    #+-----+----+-------+
    #|    3| ccc|101|102|
    #|    5| eee|    101|
    #|    1| aaa|    101|
    #|    4| ddd|103|104|
    #|    2| bbb|    102|
    #+-----+----+-------+
    

    请记住,spark DataFrame 本质上是无序的,除非您使用 sortorderBy 明确引入顺序。


    通过您的尝试解决问题:

    我想在 .withColum 方法中调用 UDF 时我做错了什么,但我不知道是什么

    如果您查看代码的执行计划,您会发现where(dataframe.index == index) 部分基本上被忽略了。

    resultDF = a_DF.withColumn("codes", lit(concatcodes(a_DF.index, b_df)))
    resultDF.explain()
    #== Physical Plan ==
    #*(1) Project [index#0, name#1, 101|102|101|102|103|104|101 AS codes#64]
    #+- Scan ExistingRDD[index#0,name#1]
    

    我怀疑这是由于the python udf being applied in batch mode, rather than on a Row basis。你不能use a Dataframe inside a udf,所以必须发生的是优化器正在运行一次collect,并将其用于所有行。

    这里更大的问题是在udf 内调用collect 的方法违背了spark 的目的(这是您的基本误解)。使用 spark 的全部意义在于将您的计算并行分布到多个执行程序中。当您使用collect 操作时,这会将所有数据带入驱动程序的本地内存中。 (在您的情况下,它似乎随后被广播回执行者)。

    当您需要引用来自多个 spark DataFrame 的数据时,请使用 joins。对于udfs,您可以将它们视为本质上仅用于对单个 spark DataFrame 的单个 Row 进行操作。

    【讨论】:

    • 很棒的答案,感谢您在参考文献中的位置,并在更广泛的背景下了解我的尝试有什么问题 - 建议的解决方案显然有效!
    【解决方案2】:

    这是我的方法

    df = pd.merge(a_df,b_df, on = "index")

    df.groupby("index").agg({"name" : 'first', "code" : list})
    

    结果是

    index name        code
    
    1      aaa       [101]
    2      bbb       [102]
    3      ccc  [101, 102]
    4      ddd  [103, 104]
    5      eee       [101]
    

    【讨论】:

    • 这个问题是关于 spark 数据帧的,而不是 pandasDataFrames
    • 是的,我知道我确实对你的答案投了赞成票(它也应该被接受为答案)。我只是把它放在这里以防万一有人在找它。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-11-07
    • 1970-01-01
    • 2021-12-04
    • 2015-05-26
    • 1970-01-01
    相关资源
    最近更新 更多