【问题标题】:How can I avoid OOM issue while writing huge dataframes in orc format using PySpark?使用 PySpark 以 orc 格式编写大量数据帧时,如何避免 OOM 问题?
【发布时间】:2019-08-04 03:18:30
【问题描述】:

我有两个脚本:a 和 b。在脚本“a”中,将两个 CSV 文件读入两个数据帧,然后将其连接到一个生成的数据帧中,然后将其写入一个 CSV 文件。此任务不会导致 OOM 问题,而且速度非常快:10 亿行、100 列、每个 41.2 GB CSV 文件需要 8-9 分钟。

另一个脚本“b”在各个方面都与“a”相似,但有一个:书写格式。输入文件相同:1B 行、100 列、41.2 GB csv 文件。此脚本以 ORC 格式保存生成的数据框。然后它会导致错误:

An error occurred while calling o91.orc. Job aborted due to stage failure: Task 36 in stage 4.0 failed 4 times, most recent failure: Lost task 36.3 in stage 4.0 (TID 800, ip-*-*-*-*.ap-south-1.compute.internal, executor 10): ExecutorLostFailure (executor 10 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 5.6 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

csv读取到orc的代码是:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrameReader, DataFrameWriter
from datetime import datetime

import time

# @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print("All imports were successful.")


df = spark.read.csv(
    's3://****',
    header=True
)
print("First dataframe read with headers set to True")

df2 = spark.read.csv(
    's3://****',
    header=True
)

print("Second data frame read with headers set to True")

# Obtain columns lists
left_cols = df.columns
right_cols = df2.columns

# Prefix each dataframe's field with "left_" or "right_"
df = df.selectExpr([col + ' as left_' + col for col in left_cols])
df2 = df2.selectExpr([col + ' as right_' + col for col in right_cols])

# Perform join
# df3 = df.alias('l').join(df2.alias('r'), on='l.left_c_0' == 'r.right_c_0')

# df3 = df.alias('l').join(df2.alias('r'), on='c_0')

df3 = df.join(
    df2,
    df["left_column_test_0"] == df2["right_column_test_0"]
)

print("Dataframes have been joined successfully.")
output_file_path = 's3://****

df3.write.orc(
    output_file_path
)

# print("Dataframe has been written to csv.")
job.commit()

我的csv文件是这样的:

0,1,2,3,4,.....99
1,2,3,4,......100
2,3,4,5,......101
.
.
.
.
[continues until the 1 billionth row]

如何确保我的代码不会导致任何 OOM 错误?

【问题讨论】:

  • 我不熟悉 ORC 格式,但我想最好的方法是逐行编写。这些方面的内容:with open('yourORCfile', 'wb') as f: for row in df3: f.write(row)
  • @Gio,你知道,这在使用 S3 时不起作用。它产生一个错误:no such file or directory.
  • 你确定路径写对了吗?在工作中我必须使用:with open('\\\servername\\userhome\\user\\folder\\asubfolder\\mydata.csv', 'r') as f:(这是读取文件)
  • 该路径尚不存在,将写入文件然后该路径将存在。这就是 S3 中的工作方式。这就是我到目前为止编写其他 orc 和 csv(任何文件)文件的方式。当我使用with open 时出现问题。
  • 对不起,我不熟悉那个=)。有没有办法通过写一个空文件然后填充它来创建路径?

标签: python python-3.x apache-spark dataframe pyspark


【解决方案1】:

为了从 OOM 问题中恢复,我不得不重新分区。这样做的逻辑是每个分区肯定会在 OOM 下(给定我的数据)。

代码如下: df3 = df3.repartition("left_column_test_0")

虽然对于 ORC 文件格式,spark 花费了更多时间:29 分钟。我仍在研究为什么 orc 的 spark 比 csv 慢的原因。

【讨论】:

    猜你喜欢
    • 2016-08-20
    • 2019-12-31
    • 1970-01-01
    • 2016-03-25
    • 2018-12-25
    • 1970-01-01
    • 2019-05-11
    • 2022-07-19
    • 1970-01-01
    相关资源
    最近更新 更多