【发布时间】:2016-06-15 23:40:42
【问题描述】:
我有一个 2TB 内存集群和一个 1TB 数据集(在磁盘上)。任务是将用户会话连接在一起以进行进一步分析。 (略简化)代码如下:
dataset.flatMap(
get_key_val_from_json # gives (key, [json_string])
).reduceByKey(
add # merge lists
).map(
lambda x: Row(k=x[0], v='\t'.join(x[1])) # convert the list to a string
).toDF(
['key', 'events_with_tab_sep'] # convert to DataFrame so could save as parquet
).write.mode(
'overwrite'
).parquet(
some_path
)
但是,在reduceByKey 步骤,作业无法继续,因为执行程序开始失败(显示很多 CANNOT FIND ADDRESS 错误)并重新启动。作业最终会抛出
org.apache.spark.SparkException:作业中止。
因为
作业 X 无法完成,因为阶段 Y 失败了 4 次。
reduceByKey 的 shuffle 过程中似乎存在 OOM 问题?但是,如果我提取、排序和归约,我认为此任务与内存无关。 Spark 是否需要将整个数据集加载到内存中进行 reduce 操作?
试过但没用
1-要求Spark仅将中间文件持久化到本地磁盘,即
extracted = dataset.flatMap(
get_key_val_from_json
).persist(
StorageLevel.DISK_ONLY
)
extracted.reduceByKey(
add
).map(
lambda x: Row(k=x[0], v='\t'.join(x[1]))
).toDF(
['key', 'events_with_tab_sep']
).write.mode(
'overwrite'
).parquet(
some_path
)
【问题讨论】:
-
好吧,你在
reduceByKey中应用了非归约操作,这几乎是另一个groupByKey。在最坏的情况下,只有当所有数据都适合为单个执行程序分配的内存时,这才有效。 -
我知道最坏的情况是所有 1TB 数据都具有相同的密钥。在这里,我知道最长的事件列表约为 100KB,因此将它们连接起来应该不是问题。
-
我很困惑,如果在第一个
map之后有一个sort,那么reduceByKey操作不需要依赖于内存中的完整数据集。但似乎并非如此。 -
关于这个问题的一个很好的讨论是here
标签: apache-spark mapreduce pyspark spark-dataframe