【发布时间】:2020-09-16 20:51:58
【问题描述】:
def fun_1(csv):
# returns int[] of length = Number of New Lines in String csv
def fun_2(csv): # My WorkArround to Pass one CSV Line at One Time
return fun_1(csv)[0]
输入数据框是df
+----+----+-----+
|col1|col2|CSVs |
+----+----+-----+
| 1| a|2,0,1|
| 2| b|2,0,2|
| 3| c|2,0,3|
| 4| a|2,0,1|
| 5| b|2,0,2|
| 6| c|2,0,3|
| 7| a|2,0,1|
+----+----+-----+
下面是一个有效但需要很长时间的代码片段
from pyspark.sql.functions import udf
from pyspark.sql import functions as sf
funudf = udf(fun_2) # wish it could be fun_1
df=df.withColumn( 'pred' , funudf(sf.col('csv')))
fun_1 ,存在内存问题,一次最多只能处理 50000 行。我希望使用 funudf = udf(fun_1) 。
因此,如何将 PySpark DF 拆分为 50000 行的段,调用 funudf ->fun_1。
输出有两个列,来自输入的“col1”和“funudf 返回值”。
【问题讨论】:
-
您在运行
udf(fun_1)之前是否尝试过重新分区数据?您看到的内存问题到底是什么? -
呼叫转到另一个计算具有挑战性的内存饥饿的服务。
-
如果我理解正确,
fun_1会调用此服务,该服务对 CSV 执行一些复杂的操作,并且会占用内存。通常,减少内存需求的最简单解决方案是使用随机密钥进行重新分区。它将默认为 200 个分区。因此,您可以在运行 UDF 之前尝试df=df.repartition(800, 'some_key')。确保在fun_1之前运行count之类的操作,因为repartition是惰性的。
标签: python pyspark user-defined-functions pyspark-dataframes