【发布时间】:2021-02-19 11:31:45
【问题描述】:
我有一个嵌套的 for 循环,它在内循环中对一个数据帧执行 10 次操作,并在完成内循环后将生成的 10 个数据帧连接成一个数据帧。
更新: 我使用字典创建数据帧列表来存储每个操作,然后在内部循环结束时将它们合并。
然后它将其写入带有外循环迭代次数的镶木地板文件。 外循环有 6 次迭代,因此应该产生 6 个 parquet 文件。
它是这样的:
train=0
for i in range(0,6):
train=train+30
#For loop to aggregate input and create 10 output dataframes
dfnames={}
for j in range(0,10):
ident="_"+str(j)
#Load dataframe of around 1M rows
df=spark.read.parquet("s3://path")
dfnames['df'+ident]= #Perform aggregations and operations
#Combine the 10 datframes into a single df
df_out=df_1.uniionByName(d_2).unionByName(df_3)...unionByName(df_10)
#Write to output parquet file
df_out.write.mode('overwrite').parquet("s3://path/" + str(train) +".parquet"
在完成外循环的第三次迭代之前,它似乎工作正常。然后由于某种原因,它使用另一个尝试 id 重新启动循环。 所以我得到了前 3 个文件,但它没有进入第 4 次迭代,而是重新启动以重新提供第一个文件。我没有遇到任何失败的阶段或工作。
我尝试使用虚拟变量和打印语句单独运行 for 循环(不加载大型数据帧等),它们可以正常工作以完成。 我认为这与循环后刷新内存的方式有关。
这些是我的 EMR Spark 运行条件: 我在一个 EMR 集群上运行它,它有 5 个执行器、5 个驱动程序节点和 10 个实例,总共有 50 个内核。 spark执行器和驱动内存各45G,一共约583G。 典型的shuffle read是250G,shuffle write是331G。
一些相关的Spark环境变量如下所示:
在循环或内存管理方面我做错了什么吗? 任何见解将不胜感激!
【问题讨论】:
标签: apache-spark pyspark memory-leaks nested-loops amazon-emr