本文参考:

a. https://www.jianshu.com/p/c46bfaa5dd15

1. shuffle及历史简介

shuffle,即"洗牌",所有采用map-reduce思想的大数据计算框架的必经及最重要的阶段。顾名思义,其处于map与reduce之间,可分为2个子阶段:

a. shuffle write: map任务(spark中为ShuffleMapTask)写上游计算产生的中间数据

b. shuffle read: reduce任务(spark中为ResultTask)读取map任务产生的中间数据,用于下游计算或处理

下图示出Hadoop MapReduce框架中,shuffle发生的机制与细节。

Spark2.4源码阅读1-Shuffle机制概述

 

 图中可以看出:map任务阶段输入数据经分区、排序以及溢写磁盘(生成临时文件),然后在磁盘上进行合并(一个map输出一个文件),而reduce任务将获取并合并多个map任务产生的文件,进行后续处理。

Spark的shuffle机制虽然也采用MR思想,但Spark是基于RDD进行计算的,实现方式与Hadoop有差异,并且中途经历了比较大的变动,简述如下:

a. 在久远的Spark 0.8版本及之前,只有最简单的hash shuffle,后来引入了consolidation机制

b. 1.1版本新加入sort shuffle机制,但默认仍然使用hash shuffle

c. 1.2版本开始默认使用sort shuffle

d. 1.4版本引入了tungsten-sort shuffle,是基于普通sort shuffle创新的序列化shuffle方式

e. 1.6版本将tungsten-sort shuffle与sort shuffle合并,由Spark自动决定采用哪一种方式

f. 2.0版本之后,hash shuffle机制被删除,只保留sort shuffle机制至今

下面的代码分析致力于对Spark shuffle先有一个大致的了解。

2. shuffle机制的最顶层: ShuffleManager特征

(1) ShuffleManager的创建位置

shuffle机制的初始化是在Spark执行环境初始化时执行。在创建driver端环境(SparkContext中)和executor端环境(ExecutorBackend)时,会进行创建。主要调用org.apache.spark.SparkEnv的create方法,来构建ShuffleManager。其主要代码如下:

    // Let the user specify short names for shuffle managers
    // 注意sort, tungsten-sort对应的value均是SortShuffleManager
    val shortShuffleMgrNames = Map(
      "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
      "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
    // 通过配置文件获取manager类名
    val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
    // 获取shuffle manager对应的类名,包括SortShuffleManager或从配置中获取的
    val shuffleMgrClass =
      shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
    // 通过反射构建ShuffleManager实例
    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

 上面的代码中,可以看到已经没有了hash shuffle的存在。此外,也可以通过spark.shuffle.manager参数来指定第三方或其他方式实现的shuffle机制

(2) ShuffleManager特征

ShuffleManager是一个scala特征。其代码如下:

package org.apache.spark.shuffle

import org.apache.spark.{ShuffleDependency, TaskContext}

/**
 * Shuffle系统的可插拔接口。ShuffleManager的创建基于spark.shuffle.manager设置,
 * 且在SparkEnv中的driver和每个executor上创建。driver在该shuffle manager上注册shuffle,executor(或者driver中本地运行的任务)
 * 能够请求读取或写入数据。
 * 
 * 注意:shuffle manager会在SparkEnv中实例化,因此其构造函数可将SparkConf和boolean isDriver作为参数
 */
private[spark] trait ShuffleManager {

  /**
   * 使用ShuffleManager注册一个shuffle,获取一个句柄,以传到任务中
   */
  def registerShuffle[K, V, C](
      shuffleId: Int,
      numMaps: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle

  /** 为一个指定的分区提供ShuffleWriter,在map任务期间的executor调用 */
  def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V]

  /**
   * 为一个范围(开始分区到结束分区-1, 闭区间)内的reduce分区提供ShuffleReader。在reduce任务期间的executor调用
   */
  def getReader[K, C](
      handle: ShuffleHandle,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext): ShuffleReader[K, C]

  /**
   * 从ShuffleManager移除shuffle的元数据
   * @return 移除成功返回true, 否则为false.*/
  def unregisterShuffle(shuffleId: Int): Boolean

  /**
   * 返回一个能够基于block坐标位置获取shuffle块数据的解析器
   */
  def shuffleBlockResolver: ShuffleBlockResolver

  /** Shut down this ShuffleManager. */
  def stop(): Unit
}

   其中:

a. registerShuffle方法用于注册一种shuffle机制,并返回对应的ShuffleHandle,handle内会存储shuffle依赖信息。根据该handle类型可以进一步确定采用ShuffleWriter类型

b. getWriter方法用于获取ShuffleWriter,它是executor执行map任务时调用的

c. getReader方法用于获取ShuffleReader,它是executor执行reduce任务时调用的

d. unregisterShuffle用于取消shuffle的注册

e. shuffleBlockResolver用于获取能够定位shuffle块的解析器

3. SortShuffleManager

hash shuffle取消之后,org.apache.spark.shuffle.sort.SortShuffleManager就是ShuffleManager目前唯一的实现类。

(1) registerShuffle方法

/**
    * 获取一个ShuffleHandle传入到tasks.
    */
  override def registerShuffle[K, V, C](
                                         shuffleId: Int,
                                         numMaps: Int,
                                         dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
    if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
      // 如果分区数小于spark.shuffle.sort.bypassMergeThreshold(默认为200)并且不需要map端的聚合,则可以直接写对应分区个数的
      // 文件,且在结束时仅仅合并它们。可以避免2次序列化和反序列化操作以将溢写的文件合并在一起(正常的代码方法会发生这种情况)。
      // 其缺点是一次打开多个文件,因此需要分配缓冲区更多的内存。
      new BypassMergeSortShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
      // 否则,尝试以序列化形式缓存map输出,且这样做更高效(需要满足一定条件)
      new SerializedShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else {
      // 否则,以反序列化形式缓存map输出
      new BaseShuffleHandle(shuffleId, numMaps, dependency)
    }
  }

   可以看出,根据条件的不同,返回不同的handle。下面依次分析3中shuffle机制。

1)  检查是否满足SortShuffleWriter.shouldBypassMergeSort条件

private[spark] object SortShuffleWriter {
  def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
    // 如果需要map端聚合,则不能跳过排序
    if (dep.mapSideCombine) {
      false
    } else {
      val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
      dep.partitioner.numPartitions <= bypassMergeThreshold
    }
  }
}

   代码中表示,如果同时满足一下两个条件,那么会返回ByPassMergeSortShuffleHandle,将启用bypass merge-sort机制

a. 该shuffle依赖中没有map端聚合操作(如groupByKey)

b. 依赖分区数不大于spark.shuffle.sort.bypassMergeThreshold规定的值,默认值为200

2) 如果不启用bypass机制,继续检查是否符合SortShuffleManager.canUseSerializedShuffle方法

/**
    * 一个帮助类,用于决定shuffle应该使用一个优化的序列化shuffle方法,还是应该退回到操作反序列化对象的原始方法
    */
  def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
    val shufId = dependency.shuffleId
    val numPartitions = dependency.partitioner.numPartitions
    if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
      log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +
        s"${dependency.serializer.getClass.getName}, does not support object relocation")
      false
    } else if (dependency.mapSideCombine) {
      log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " +
        s"map-side aggregation")
      false
    } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
      log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +
        s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
      false
    } else {
      log.debug(s"Can use serialized shuffle for shuffle $shufId")
      true
    }
  }

   可以看出,如果同时满足3个条件,就会返回SerializedShuffleHandle,启用序列化Sort shuffle机制(也就是tungsten-sort):

a. 使用的序列化器支持序列化对象的重定位(KyroSerializer)

b. shuffle依赖中map端无聚合操作

c. 分区数不大于MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE的值,即2^24

3) 既不用bypass,也不用tungsten-sort,那么就返回默认的BaseShuffleHandle,采用基本的sort handle

(2) getWriter方法

基于不同的handle,获取相应的ShuffleWriter。

/** 为指定的分区获取ShuffleWriter,在executor执行map任务时调用 */
  override def getWriter[K, V](
                                handle: ShuffleHandle,
                                mapId: Int,
                                context: TaskContext): ShuffleWriter[K, V] = {
    numMapsForShuffle.putIfAbsent(
      handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
    val env = SparkEnv.get
    handle match {
      case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
        new UnsafeShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          context.taskMemoryManager(),
          unsafeShuffleHandle,
          mapId,
          context,
          env.conf)
      case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
        new BypassMergeSortShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          bypassMergeSortHandle,
          mapId,
          context,
          env.conf)
      case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
        new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
    } 

对于tungsten-sort会使用UnsafeShuffleWriter,对于bypass会使用BypassMergeSortShuffleWriter,普通的sort则使用SortShuffleWriter。都继承于ShuffleWriter抽象类,且实现了write方法。

(3) getReader方法

  /**
    * 为一个范围(开始分区到结束分区-1, 闭区间)内的reduce分区提供ShuffleReader。在executor执行reduce任务时调用
    */
  override def getReader[K, C](
                                handle: ShuffleHandle,
                                startPartition: Int,
                                endPartition: Int,
                                context: TaskContext): ShuffleReader[K, C] = {
    new BlockStoreShuffleReader(
      handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
  } 

ShuffleReader与ShuffleWriter不同,只有一种实现类,即BlockStoreShuffleReader。它继承自ShuffleReader特征,并实现了read方法。

4. 总结 

ShuffleManager的简易类图如下:

Spark2.4源码阅读1-Shuffle机制概述

5. 附录

(1) SortShuffleManager的完整阅读代码

package org.apache.spark.shuffle.sort

import java.util.concurrent.ConcurrentHashMap

import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.shuffle._

/**
  * 基于排序的shuffle中, 输入数据根据它们的分区id进行排序, 然后写入到一个单独的map输出文件。Reducers获取此文件的
  * 连续区域以读取它们的map输出部分。当遇到输出文件过大而内存无法满足时,该output的排序后的子集将溢写磁盘,并且溢写磁盘
  * 的文件将被合并,以产生最终的输出文件。
  *
  * 基于排序的shuffle有2种不同的写方法来产生其map输出文件:
  * 1. 序列化排序:当满足如下3个条件时可以使用
  * a. shuffle依赖不指定任何聚合(aggregation)或输出排序
  * b. shuffle的序列化器(serializer)支持序列化值的重定位(当前KyroSerializer和Spark SQL中自定义的序列化器满足)
  * c. shuffle产生的输出分区数需小于2^24个
  * 2.发序列化排序:用于处理其他情况
  * -----------------------
  * 序列化排序方法
  * -----------------------
  * 在序列化排序模式中,输入数据一经传入shuffle writer即被序列化,并且排序过程中以序列化的形式缓存。这种写方法实现了
  * 如下几种优化:
  * a.其排序作用于序列化的二进制数据,而非Java对象,可以减小内存占用和GC开销。这种优化需要数据序列化器具有某些属性,以允许
  *   序列化的数据无需反序列化就可以被重排序。了解更多细节,可查看SPARK-4550
  * b.其使用一种特殊的高效缓存排序器ShuffleExternalSorter,对压缩数据的指针和分区Id的数组进行排序。排序数组中每条记录仅
  *   使用8 byte,使得更多的数组适于缓存
  * c.溢出合并流程作用于属于同一个分区的序列化记录的block上,并且在合并期间无需反序列化记录
  * d.当溢出压缩编码支持压缩数据的拼接(concatenation)时,溢出合并仅仅拼接序列化且压缩的溢出分区,以产生最终的输出分区。
  *   此过程允许使用有效的数据拷贝方法,例如NIO的transferTo,而且避免合并期间分配解压缩或拷贝缓存的需要。
  */
private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {

  // Spark1.6+之后,已强制溢写磁盘
  if (!conf.getBoolean("spark.shuffle.spill", true)) {
    logWarning(
      "spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+." +
        " Shuffle will continue to spill to disk when necessary.")
  }

  /**
    * shuffle id到为这些shuffle产生输出的mapper数量的映射
    */
  private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]()

  // 创建并维护逻辑block和物理文件位置之间的shuffle block映射关系
  override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)

  /**
    * 获取一个ShuffleHandle传入到tasks.
    */
  override def registerShuffle[K, V, C](
                                         shuffleId: Int,
                                         numMaps: Int,
                                         dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
    if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
      // 如果分区数小于spark.shuffle.sort.bypassMergeThreshold(默认为200)并且不需要map端的聚合,则可以直接写对应分区个数的
      // 文件,且在结束时仅仅合并它们。可以避免2次序列化和反序列化操作以将溢写的文件合并在一起(正常的代码方法会发生这种情况)。
      // 其缺点是一次打开多个文件,因此需要分配缓冲区更多的内存。
      new BypassMergeSortShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
      // 否则,尝试以序列化形式缓存map输出,且这样做更高效(需要满足一定条件)
      new SerializedShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else {
      // 否则,以反序列化形式缓存map输出
      new BaseShuffleHandle(shuffleId, numMaps, dependency)
    }
  }

  /**
    * 为一个范围(开始分区到结束分区-1, 闭区间)内的reduce分区提供ShuffleReader。在executor执行reduce任务时调用
    */
  override def getReader[K, C](
                                handle: ShuffleHandle,
                                startPartition: Int,
                                endPartition: Int,
                                context: TaskContext): ShuffleReader[K, C] = {
    new BlockStoreShuffleReader(
      handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
  }

  /** 为指定的分区获取ShuffleWriter,在executor执行map任务时调用 */
  override def getWriter[K, V](
                                handle: ShuffleHandle,
                                mapId: Int,
                                context: TaskContext): ShuffleWriter[K, V] = {
    numMapsForShuffle.putIfAbsent(
      handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
    val env = SparkEnv.get
    handle match {
      case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
        new UnsafeShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          context.taskMemoryManager(),
          unsafeShuffleHandle,
          mapId,
          context,
          env.conf)
      case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
        new BypassMergeSortShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          bypassMergeSortHandle,
          mapId,
          context,
          env.conf)
      case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
        new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
    }
  }

  /** 从ShuffleManager移除shuffle的元数据. */
  override def unregisterShuffle(shuffleId: Int): Boolean = {
    Option(numMapsForShuffle.remove(shuffleId)).foreach { numMaps =>
      (0 until numMaps).foreach { mapId =>
        shuffleBlockResolver.removeDataByMap(shuffleId, mapId)
      }
    }
    true
  }

  /** 关闭ShuffleManager. */
  override def stop(): Unit = {
    shuffleBlockResolver.stop()
  }
}


private[spark] object SortShuffleManager extends Logging {

  /**
    *  当以序列化形式缓冲map输出时,SortShuffleManager支持的最大shuffle输出分区数。
    *  这是这一种极端防御性编程措施,单个shuffle不能产生超过1600W个输出分区
    * */
  val MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE =
    PackedRecordPointer.MAXIMUM_PARTITION_ID + 1

  /**
    * 一个帮助类,用于决定shuffle应该使用一个优化的序列化shuffle方法,还是应该退回到操作反序列化对象的原始方法
    */
  def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
    val shufId = dependency.shuffleId
    val numPartitions = dependency.partitioner.numPartitions
    if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
      log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +
        s"${dependency.serializer.getClass.getName}, does not support object relocation")
      false
    } else if (dependency.mapSideCombine) {
      log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " +
        s"map-side aggregation")
      false
    } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
      log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +
        s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
      false
    } else {
      log.debug(s"Can use serialized shuffle for shuffle $shufId")
      true
    }
  }
}

/**
  * BaseShuffleHandle的子类, 用于标识何时选择使用序列化shuffle
  */
private[spark] class SerializedShuffleHandle[K, V](
                                                    shuffleId: Int,
                                                    numMaps: Int,
                                                    dependency: ShuffleDependency[K, V, V])
  extends BaseShuffleHandle(shuffleId, numMaps, dependency) {
}

/**
  * BaseShuffleHandle的子类, 用于标识何时选择使用bypass merge sort shuffle
  */
private[spark] class BypassMergeSortShuffleHandle[K, V](
                                                         shuffleId: Int,
                                                         numMaps: Int,
                                                         dependency: ShuffleDependency[K, V, V])
  extends BaseShuffleHandle(shuffleId, numMaps, dependency) {
}
View Code

相关文章: