【问题标题】:Calling another custom Python function from Pyspark UDF从 Pyspark UDF 调用另一个自定义 Python 函数
【发布时间】:2019-09-05 09:43:14
【问题描述】:

假设您有一个文件,我们将其命名为 udfs.py 并在其中:

def nested_f(x):
    return x + 1

def main_f(x):
    return nested_f(x) + 1

然后您想从 main_f 函数中创建一个 UDF 并在数据帧上运行它:

import pyspark.sql.functions as fn
import pandas as pd

pdf = pd.DataFrame([[1], [2], [3]], columns=['x'])
df = spark.createDataFrame(pdf)

_udf = fn.udf(main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()

如果我们在定义两个函数的同一文件中执行此操作(udfs.py),则此操作正常。但是,尝试从不同的文件(比如main.py)执行此操作会产生错误ModuleNotFoundError: No module named ...

...
import udfs

_udf = fn.udf(udfs.main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()

我注意到,如果我 实际上嵌套 nested_fmain_f 中,如下所示:

def main_f(x):
    def nested_f(x):
        return x + 1

    return nested_f(x) + 1

一切运行正常。然而,我的目标是在多个函数中很好地分离逻辑,我也可以单独测试。

认为这可以通过使用spark.sparkContext.addPyFile('...udfs.py')udfs.py 文件(或整个压缩文件夹)提交给执行者来解决。然而:

  1. 我觉得这有点啰嗦(尤其是如果您需要压缩文件夹等...)
  2. 这并不总是容易/可能的(例如,udfs.py 可能正在使用许多其他模块,这些模块也需要提交,从而导致一些连锁反应...)
  3. addPyFile 还存在其他一些不便(例如 autoreload can stop working 等)

所以问题是:有没有办法同时做所有这些:

  • 将 UDF 的逻辑很好地拆分为几个 Python 函数
  • 使用不同于定义逻辑的文件中的 UDF
  • 不需要使用addPyFile提交任何依赖项

澄清这是如何工作的/为什么这不起作用的奖励积分!

【问题讨论】:

  • 在 udfs.py 中将您的函数注册为 UDF。
  • 你试过了吗?我认为这行不通。
  • @Ferrad:它有效。在 udfs.py 中注册你的 UDF,然后在其他模块中导入你注册的 udf。

标签: python apache-spark pyspark user-defined-functions


【解决方案1】:

对于小的(一个或两个本地文件)依赖项,您可以使用 --py-files 并枚举它们,具有更大或更多的依赖项 - 最好将其打包在 zip 或 egg 文件中。

文件udfs.py

def my_function(*args, **kwargs):
    # code

文件main.py

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from udfs import my_function

sc = SparkContext()
spark = SparkSession(sc)
my_udf = udf(my_function)

df = spark.createDataFrame([(1, "a"), (2, "b")])
df.withColumn("my_f", my_udf("..."))

运行:

pyspark --py-files /path/to/udfs.py
# or
spark-submit --py-files /path/to/udfs.py main.py

如果你已经写了自己的 Python 模块甚至是第三方模块(不需要 C 编译),我个人需要 geoip2,最好是创建一个 zip 或 egg 文件。

# pip with -t install all modules and dependencies in directory `src`
pip install geoip2 -t ./src
# Or from local directory
pip install ./my_module -t ./src

# Best is 
pip install -r requirements.txt -t ./src

# If you need add some additionals files
cp ./some_scripts/* ./src/

# And pack it
cd ./src
zip -r ../libs.zip .
cd ..

pyspark --py-files libs.zip
spark-submit --py-files libs.zip

使用pyspark --master yarn(可能与其他非本地主选项)时要小心,在带有--py-files的pyspark shell中:

>>> import sys
>>> sys.path.insert(0, '/path/to/libs.zip')  # You can use relative path: .insert(0, 'libs.zip')
>>> import MyModule  # libs.zip/MyModule

编辑 - 关于如何在没有 addPyFile ()--py-files 的情况下在执行程序上获取函数的问题的答案:

必须有一个给定的文件,其中包含各个执行程序的功能。并且可以通过 PATH 环境访问。 因此,我可能会编写一个 Python 模块,然后将其安装在执行器上并在环境中可用。

【讨论】:

  • 谢谢,一个有用的答案,虽然它不完全是我所追求的,似乎 --py-files 只是 CLI 相当于 addPyFile ( stackoverflow.com/a/38072930/1913724 ) 。可能我所要求的不存在,在这种情况下,最好知道原因!
  • @Ferrard - 关于如何在没有addPyFile ()--py-files 的情况下在执行程序上获取函数的问题的答案: 必须有一个给定的文件,其中包含个人的函数执行人。并且可以通过 PATH 环境访问。因此,我可能会编写一个 Python 模块,然后将其安装在执行器上并在环境中可用。
【解决方案2】:

也许尝试在一个类中组织你的方法,如下所示:

class temp_class:
    def nested_f(self, x):
      return x + 1

    def main_f(self, x):
      return self.nested_f(x) + 1

这可能行得通!

【讨论】:

    猜你喜欢
    • 2020-07-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-30
    • 2016-06-11
    • 1970-01-01
    相关资源
    最近更新 更多