【发布时间】:2018-06-28 04:40:16
【问题描述】:
所以问题在主题中。我想我没有正确理解重新分区的工作。在我的脑海中,当我说somedataset.repartition(600) 时,我希望所有数据都将按相同大小在工人之间进行分区(比如说 60 名工人)。
例如。我会在不平衡的文件中加载大量数据,比如说 400 个文件,其中 20% 是 2Gb 大小,其他 80% 是大约 1 Mb。我有加载这些数据的代码:
val source = sparkSession.read.format("com.databricks.spark.csv")
.option("header", "false")
.option("delimiter","\t")
.load(mypath)
我想将原始数据转换为我的中间对象,过滤不相关的记录,转换为最终对象(带有附加属性),然后按一些列分区并写入镶木地板。在我看来,平衡工作人员之间的数据(40000 个分区)似乎是合理的,而不是像这样工作:
val ds: Dataset[FinalObject] = source.repartition(600)
.map(parse)
.filter(filter.IsValid(_))
.map(convert)
.persist(StorageLevel.DISK_ONLY)
val count = ds.count
log(count)
val partitionColumns = List("region", "year", "month", "day")
ds.repartition(partitionColumns.map(new org.apache.spark.sql.Column(_)):_*)
.write.partitionBy(partitionColumns:_*)
.format("parquet")
.mode(SaveMode.Append)
.save(destUrl)
但它失败了
ExecutorLostFailure(执行器 7 退出导致其中一个正在运行 任务)原因:容器因超出内存限制而被 YARN 杀死。 使用了 34.6 GB 的 34.3 GB 物理内存。考虑提升 spark.yarn.executor.memoryOverhead。
当我不重新分区时,一切都很好。哪里不明白repartition正确吗?
【问题讨论】:
标签: scala apache-spark partitioning