【问题标题】:PySpark execute plain Python function on each DataFrame rowPySpark 在每个 DataFrame 行上执行普通 Python 函数
【发布时间】:2020-05-28 18:30:35
【问题描述】:

我有数百万行的 Spark DataFrame DF1。每行最多有 100 列。

col1 | col2 | col3 | ... | colN
--------------------------------
v11  | v12  | v13  | ... | v1N
v21  | v22  | v23  | ... | v2N
...  | ...  | ...  | ... | ...

此外,我还有另一个 DataFrame DF2,其中有数百行包含名称和正文列。 Name 包含函数名称,body 包含纯 Python 代码,返回 true 或 false 的布尔函数。这些函数在它们的逻辑中,可以引用来自 DF1 的单行中的任何列。

func_name | func_body
-----------------------------------------------
func1     |   col2 < col45
func2     |   col11.contains("London") and col32*col15 < col21
funcN     |   .... 

我需要将这两个 DataFrames - DF1 与 DF2 连接起来,并将 Df2 中的每个函数应用到 DF1 中的每一行。每个函数都必须能够接受来自 DF1 的参数,比方说具有键/值对的字典数组,这些键/值对代表 DF1 中相应行的所有列的名称/值。

我知道如何加入 DF1 和 DF2,此外,我知道 Python 函数的执行不会以分布式方式工作。现在没关系。这是一个临时解决方案。我只需要将 DF1 中的所有行分布到工作节点上,然后在 Apache Spark 应用程序的不同任务中将每个 Python 函数应用于 DF1 的每一行。评估 eval() 它们并传递字典数组,其中包含键/值对,如上所述。

一般来说,每个 Python 函数都是一个标签,我想将其分配给 DF1 中的行,以防某些函数返回 true。例如,这是生成的 DataFrame DF3

col1 | col2 | col3 | ... | colN | tags
--------------------------------------
v11  | v12  | v13  | ... | v1N  | [func1, func76, funcN]
v21  | v22  | v23  | ... | v2N  | [func32]
...  | ...  | ...  | ... | ...  | [..., ..., ..., ..., ...]

PySpark 是否可行,如果可以,您能否举例说明如何实现?以DF.columns 中的Map 作为输入参数的UDF 函数是正确的方法还是可以以更简单的方式完成? Spark 对一个时间点可以注册多少 UDF 函数(数量)有任何限制吗?

【问题讨论】:

    标签: python dataframe apache-spark pyspark apache-spark-sql


    【解决方案1】:

    您可以使用可以使用expr 评估的SQL 表达式来实现这一点。但是,您将无法加入 2 个 DataFrame,因为 SQL 表达式无法作为列值进行评估(请参阅此 post),因此您必须将函数收集到一个列表中(因为您只有数百行,它可以放入内存中)。

    这是一个可以根据您的要求进行调整的工作示例:

    data1 = [(1, "val1", 4, 5, "A", 10), (0, "val2", 7, 8, "B", 20),
             (9, "val3", 8, 1, "C", 30), (10, "val4", 2, 9, "D", 30),
             (20, "val5", 6, 5, "E", 50), (3, "val6", 100, 2, "X", 45)]
    
    df1 = spark.createDataFrame(data1, ["col1", "col2", "col3", "col4", "col5", "col6"])
    
    data2 = [("func1", "col1 + col3 = 5 and col2 like '%al1'"),
             ("func2", "col6 = 30 or col1 * col4 > 20"),
             ("func3", "col5 in ('A', 'B', 'C') and col6 - col1 < 30"),
             ("func4", "col2 like 'val%' and col1 > 0")]
    
    df2 = spark.createDataFrame(data2, ["func_name", "func_body"])
    
    # get functions into a list
    functions = df2.collect()
    
    # case/when expression to evaluate the functions
    satisfied_expr = [when(expr(f.func_body), lit(f.func_name)) for f in functions]
    
    # add new column tags
    df1.withColumn("tags", array(*satisfied_expr)) \
        .withColumn("tags", expr("filter(tags, x -> x is not null)")) \
        .show(truncate=False)
    

    添加数组列tags后,filter函数用于删除对应不满足表达式的空值。此功能仅从 Spark 2.4+ 开始可用,对于旧版本,您必须使用 UDF。

    给予:

    +----+----+----+----+----+----+---------------------+
    |col1|col2|col3|col4|col5|col6|tags                 |
    +----+----+----+----+----+----+---------------------+
    |1   |val1|4   |5   |A   |10  |[func1, func3, func4]|
    |0   |val2|7   |8   |B   |20  |[func3]              |
    |9   |val3|8   |1   |C   |30  |[func2, func3, func4]|
    |10  |val4|2   |9   |D   |30  |[func2, func4]       |
    |20  |val5|6   |5   |E   |50  |[func2, func4]       |
    |3   |val6|100 |2   |X   |45  |[func4]              |
    +----+----+----+----+----+----+---------------------+
    

    【讨论】:

    • 感谢您的回答。不幸的是,我无法将提到的 Python 函数重写为 SQL 表达式。我需要将 Python 函数评估为 Python 代码
    • @alexanoid 好的,我明白了。不幸的是,如果您想按原样使用它们,则必须为它们中的每一个注册 UDF。您可以将df1 的所有列作为列表传递给所有 UDF,并在每个 UDF 中获取您想要的列并应用您的逻辑...但是使用这种方法您不会让 spark 优化计算,所以也许你会遇到一些性能问题。
    • 谢谢!老式 UDF 是将遗留逻辑移植到 Spark API 的一种临时解决方案。因此,时间性能问题并不是一个大问题,因为在某个时间点,所有这些功能都将被用 Apache API 编写的新逻辑所取代。一个附加问题 - 是否可以从另一个函数调用一个 UDF 函数?
    • 据我所知,你不能从另一个 UDF 调用一个 UDF...你到底想做什么?
    • 嗨@alexanoid,是的,就是这样!当你调用它时,它仍然被解释为 Python 函数。如果你注册了创建的UDF,那么你可以将它们的名字保存在DF2中,然后使用与上面代码相​​同的逻辑。
    猜你喜欢
    • 1970-01-01
    • 2017-01-27
    • 1970-01-01
    • 1970-01-01
    • 2021-11-04
    • 2018-09-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多