【问题标题】:Why Spark repartition leads to MemoryOverhead?为什么 Spark 重新分区会导致 MemoryOverhead?
【发布时间】:2018-06-28 04:40:16
【问题描述】:

所以问题在主题中。我想我没有正确理解重新分区的工作。在我的脑海中,当我说somedataset.repartition(600) 时,我希望所有数据都将按相同大小在工人之间进行分区(比如说 60 名工人)。

例如。我会在不平衡的文件中加载大量数据,比如说 400 个文件,其中 20% 是 2Gb 大小,其他 80% 是大约 1 Mb。我有加载这些数据的代码:

val source = sparkSession.read.format("com.databricks.spark.csv")
  .option("header", "false")
  .option("delimiter","\t")
  .load(mypath)

我想将原始数据转换为我的中间对象,过滤不相关的记录,转换为最终对象(带有附加属性),然后按一些列分区并写入镶木地板。在我看来,平衡工作人员之间的数据(40000 个分区)似乎是合理的,而不是像这样工作:

val ds: Dataset[FinalObject] = source.repartition(600)
  .map(parse)
  .filter(filter.IsValid(_))
  .map(convert)
  .persist(StorageLevel.DISK_ONLY)
val count = ds.count
log(count)
val partitionColumns = List("region", "year", "month", "day")

ds.repartition(partitionColumns.map(new org.apache.spark.sql.Column(_)):_*)
  .write.partitionBy(partitionColumns:_*)
  .format("parquet")
  .mode(SaveMode.Append)
  .save(destUrl)

但它失败了

ExecutorLostFailure(执行器 7 退出导致其中一个正在运行 任务)原因:容器因超出内存限制而被 YARN 杀死。 使用了 34.6 GB 的 34.3 GB 物理内存。考虑提升 spark.yarn.executor.memoryOverhead。

当我不重新分区时,一切都很好。哪里不明白repartition正确吗?

【问题讨论】:

    标签: scala apache-spark partitioning


    【解决方案1】:

    您的逻辑对于repartitionpartitionBy 都是正确的,但在使用repartition 之前,您需要记住这件事来自多个来源。

    请记住,重新分区数据的成本相当高 手术。 Spark 还有一个优化版本的 repartition(),称为 coalesce() 允许避免数据移动,但前提是你是 减少 RDD 分区的数量。

    如果您希望您的任务必须完成,请增加驱动程序和执行程序的内存

    【讨论】:

    • 那么什么时候值得 ro 重新分区输入数据呢?当输入数据分区计数非常小时(比如几个非常大的文件)。
    • 这取决于要求。如需更多说明,请参阅 - stackoverflow.com/questions/31610971/…
    猜你喜欢
    • 2018-11-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-07-25
    • 2015-01-28
    • 2016-09-24
    • 2020-05-17
    相关资源
    最近更新 更多