【问题标题】:For Loop keeps restarting in EMR (pyspark)For Loop 在 EMR (pyspark) 中不断重启
【发布时间】:2021-02-19 11:31:45
【问题描述】:

我有一个嵌套的 for 循环,它在内循环中对一个数据帧执行 10 次操作,并在完成内循环后将生成的 10 个数据帧连接成一个数据帧。

更新: 我使用字典创建数据帧列表来存储每个操作,然后在内部循环结束时将它们合并。

然后它将其写入带有外循环迭代次数的镶木地板文件。 外循环有 6 次迭代,因此应该产生 6 个 parquet 文件。

它是这样的:

train=0
for i in range(0,6):
    train=train+30
    #For loop to aggregate input and create 10 output dataframes
    dfnames={}
    for j in range(0,10):
        ident="_"+str(j)  
        #Load dataframe of around 1M rows
        df=spark.read.parquet("s3://path")
        dfnames['df'+ident]= #Perform aggregations and operations
    #Combine the 10 datframes into a single df
    df_out=df_1.uniionByName(d_2).unionByName(df_3)...unionByName(df_10)
    #Write to output parquet file
    df_out.write.mode('overwrite').parquet("s3://path/" + str(train) +".parquet"

在完成外循环的第三次迭代之前,它似乎工作正常。然后由于某种原因,它使用另一个尝试 id 重新启动循环。 所以我得到了前 3 个文件,但它没有进入第 4 次迭代,而是重新启动以重新提供第一个文件。我没有遇到任何失败的阶段或工作。

我尝试使用虚拟变量和打印语句单独运行 for 循环(不加载大型数据帧等),它们可以正常工作以完成。 我认为这与循环后刷新内存的方式有关。

这些是我的 EMR Spark 运行条件: 我在一个 EMR 集群上运行它,它有 5 个执行器、5 个驱动程序节点和 10 个实例,总共有 50 个内核。 spark执行器和驱动内存各45G,一共约583G。 典型的shuffle read是250G,shuffle write是331G。

一些相关的Spark环境变量如下所示:

在循环或内存管理方面我做错了什么吗? 任何见解将不胜感激!

【问题讨论】:

    标签: apache-spark pyspark memory-leaks nested-loops amazon-emr


    【解决方案1】:

    尽量不要将 Python 数据结构与 Spark 数据结构结合起来。

    您希望将 for 循环转换为 ma​​p-reduce、foreach 设计形式。

    除此之外,您还可以在每次迭代中创建一个缓存/火花检查点,以避免从头开始重新运行整个 DAG。

    缓存您的数据:

    df.cache()
    

    用于检查点

    spark.sparkContext.setCheckpointDir('<some path>')
    df.checkpoint()
    

    一旦您使用 spark 构造而不是 python 构造,这些将显示性能和规模改进。例如,用 foreach 替换你的 for 循环,用 map reduce 替换列表的联合。

    【讨论】:

    • Python 数据结构是指dfnames[] 字典吗?如何创建火花检查点?
    • 是的,dfnames 是一个字典,列表、集合等都是 python 数据结构,而 RDD 和 DataFrames 是 PySpark 数据结构。
    • 所以我查看了foreach(),它似乎只适用于 RDD。我不能在数据帧上使用它。还有其他方法可以替代 for 循环吗?
    【解决方案2】:

    你是怎么得到你的 df1, df2... 在这条线之前的?

    #Combine the 10 datframes into a single df df_out=df1.uniionByName(d2).unionByName(df3)...unionByName(df10)

    我的猜测是,您的数据框计划越来越大,这可能会导致问题。

    我建议在内部循环中创建一个数据帧列表并使用reduce 方法将它们合并。

    如下所示

    from functools import reduce
    from pyspark.sql import DataFrame
    df_list = []
    for j in range(0,10):  
            #Load dataframe of around 1M rows
            df = spark.read.parquet("s3://path")
            transformed_df = #do your transforms
            df_list.append(transformed_df)
    
    final_df = reduce(DataFrame.unionByName, df_list)
    
    

    【讨论】:

    • 我更新了我的帖子以显示该部分。我使用字典 dfnames 将生成的 10 个数据帧存储为 df_1df_2、df_3` 等。然后是 unionByName。这 10 个数据帧中的每一个都是独一无二的,我想在内部循环之后简单地合并它们。 “reduce”方法会以任何方式修改内容吗? append 函数也不会将附加 dfs 的内容保留在单个节点中,而不是分布式?
    • reduce 方法不会修改内容。它只是将其参数中传递的特定函数应用于所有列表元素。 append 只是将数据框附加到列表中,而数据框只是计划,它们不包含任何数据。基本上,您的列表将引用每次迭代的每个转换。像这样[DataFrame[comment: bigint, inp_col: string, input_val: string], DataFrame[comment: bigint, inp_col: string, input_val: string], DataFrame[comment: bigint, inp_col: string, input_val: string]]
    • 我尝试了append 方法。但是我遇到了内存堆错误:(
    猜你喜欢
    • 2019-04-01
    • 2022-11-17
    • 1970-01-01
    • 1970-01-01
    • 2014-05-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多