【发布时间】:2019-09-05 09:43:14
【问题描述】:
假设您有一个文件,我们将其命名为 udfs.py 并在其中:
def nested_f(x):
return x + 1
def main_f(x):
return nested_f(x) + 1
然后您想从 main_f 函数中创建一个 UDF 并在数据帧上运行它:
import pyspark.sql.functions as fn
import pandas as pd
pdf = pd.DataFrame([[1], [2], [3]], columns=['x'])
df = spark.createDataFrame(pdf)
_udf = fn.udf(main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()
如果我们在定义两个函数的同一文件中执行此操作(udfs.py),则此操作正常。但是,尝试从不同的文件(比如main.py)执行此操作会产生错误ModuleNotFoundError: No module named ...:
...
import udfs
_udf = fn.udf(udfs.main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()
我注意到,如果我 实际上嵌套 nested_f 在 main_f 中,如下所示:
def main_f(x):
def nested_f(x):
return x + 1
return nested_f(x) + 1
一切运行正常。然而,我的目标是在多个函数中很好地分离逻辑,我也可以单独测试。
我认为这可以通过使用spark.sparkContext.addPyFile('...udfs.py') 将udfs.py 文件(或整个压缩文件夹)提交给执行者来解决。然而:
- 我觉得这有点啰嗦(尤其是如果您需要压缩文件夹等...)
- 这并不总是容易/可能的(例如,
udfs.py可能正在使用许多其他模块,这些模块也需要提交,从而导致一些连锁反应...) -
addPyFile还存在其他一些不便(例如 autoreload can stop working 等)
所以问题是:有没有办法同时做所有这些:
- 将 UDF 的逻辑很好地拆分为几个 Python 函数
- 使用不同于定义逻辑的文件中的 UDF
- 不需要使用
addPyFile提交任何依赖项
澄清这是如何工作的/为什么这不起作用的奖励积分!
【问题讨论】:
-
在 udfs.py 中将您的函数注册为 UDF。
-
你试过了吗?我认为这行不通。
-
@Ferrad:它有效。在 udfs.py 中注册你的 UDF,然后在其他模块中导入你注册的 udf。
标签: python apache-spark pyspark user-defined-functions