【问题标题】:Can reduced parallelism lead to no shuffle spill?减少并行性可以导致没有洗牌溢出吗?
【发布时间】:2019-11-07 17:37:41
【问题描述】:

举个例子:

我有一个包含 5 个节点的集群,每个节点有 64 个内核和 244 GB 内存。

我决定在每个节点上运行 3 个 executor,并设置 executor-cores 为 21,executor 内存为 80GB,这样每个 executor 可以并行执行 21 个任务。现在考虑 315(63 * 5) 个数据分区,其中 314 个分区大小为 3GB,但其中一个为 30GB(由于数据倾斜)。

所有收到 3GB 分区的 executor 有 63GB(21 * 3 = 因为每个 executor 可以并行运行 21 个任务,每个任务占用 3GB 内存空间)占用。

但是收到 30GB 分区的一个执行程序将需要 90GB(20 * 3 + 30) 内存。那么这个执行器会先执行 20 个 3GB 的任务然后加载 30GB 的任务,还是会尝试加载 21 个任务并发现对于一个任务它必须溢出到磁盘?如果我将 executor-cores 设置为 15,那么接收 30 GB 分区的 executor 将只需要 14 * 3 + 30 = 72 gb,因此不会溢出到磁盘。

那么在这种情况下,减少并行性会导致没有 shuffle 溢出吗?

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    @Venkat Dabri,

    您能否用适当的回车/空格格式化问题?

    这里有几个指针

    Spark (Shuffle)Map Stage ==> 每个分区的大小取决于文件系统的块大小。例如。如果从 HDFS 读取数据,每个分区将尝试使数据接近 128MB,因此对于输入数据,分区数 = floor(文件数 * blocksize/128(实际上使用的是 122.07 兆字节) )

    现在你描述的场景是 Shuffled data in Reducer(Result Stage)

    这里,reducer 任务处理的 blocks 称为 Shuffled Blocks,默认情况下 Spark(用于 SQL/Core API)将启动 200 reducer任务

    重要的是要记住,Spark 最多可以容纳 2GB,所以如果您的分区太很少,而其中一个分区执行 远程提取 shuffle block > 2GB,你会看到类似Size exceeds Integer.MAX_VALUE的错误

    为了缓解这种情况,Spark 在默认限制内采用了许多优化(压缩/tungsten-sort-shuffle 等),但作为开发人员,我们可以尝试智能地重新分区倾斜数据并调整默认并行度

    【讨论】:

    • 我知道你上面说的。这个答案没有回答我的问题。可以使用重新分区来设置输入分区。同样,可以使用 spark.sql.shuffle.partitions 相应地设置每个阶段的分区。所以我没有使用你在回复中建议的默认值。
    猜你喜欢
    • 2016-07-24
    • 1970-01-01
    • 1970-01-01
    • 2014-08-07
    • 1970-01-01
    • 1970-01-01
    • 2022-11-10
    • 2019-04-15
    • 1970-01-01
    相关资源
    最近更新 更多