【发布时间】:2020-05-10 00:10:46
【问题描述】:
我在 Pyspark 上使用 Pandas UDF。
我有一个主文件 __main_.py,其中包含:
from pyspark.sql import SparkSession
from run_udf import compute
def main():
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
df = compute(df)
df.show()
spark.stop()
if __name__ == "__main__":
main()
还有一个 run_udf.py 文件,其中包含我的 UDF 函数和另一个函数(将单个变量乘以 2):
from pyspark.sql.functions import pandas_udf, PandasUDFType
def multi_by_2(x):
return 2 * x
def compute(df):
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=multi_by_2(v) - v.mean())
df = df.groupby("id").apply(subtract_mean)
return df
通过运行 main.py,我收到以下错误:“没有名为 'run_udf' 的模块”。 在此配置中,subtract_mean() 似乎无法访问函数 multi_by_2()。我找到了 2 种方法,但不知道它是否符合最佳实践标准:
方法 1:(将函数移动到计算内部 - 不理想,因为我每次使用另一个 pandas_udf() 函数时都会复制该函数 - 我们失去了“可重用”函数的概念) .
def compute(df):
def multi_by_2(x):
return 2 * x
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=multi_by_2(v) - v.mean())
df = df.groupby("id").apply(subtract_mean)
return df
方法二:将乘法函数作为compute的参数传入。
__main_.py
from pyspark.sql import SparkSession
from run_udf import compute
def multi_by_2(x):
return 2 * x
def main():
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
df = compute(df, multi_by_2)
df.show()
spark.stop()
if __name__ == "__main__":
main()
run_udf.py from pyspark.sql.functions import pandas_udf, PandasUDFType
def compute(df, multi_by_2):
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=multi_by_2(v) - v.mean())
df = df.groupby("id").apply(subtract_mean)
return df
我发现的两个解决方案似乎有点老套。有没有更好的方法来解决这个问题?
【问题讨论】:
标签: python pyspark user-defined-functions