Spark是以Shuffle作为Stage的分界的,Shuffle分为ShuffleWrite和ShuffleRead两个过程。

ShuffleWriter负责将中间结果写到磁盘上,可以理解为是当前Stage结束的收尾操作;最后ShuffleRead用于读取ShuffleWrite生成的Shuffle文件,可以理解为它是下一个Stage开始的那个操作

之前已经分析过三种ShuffleWrite的写方式了,现在来分析下ShuffleRead是怎么拉取磁盘上的数据的。

 

个人调试代码说明:

  • def main(args: Array[String]): Unit = {
      val sc = SparkHolder.getSparkContext()
    
      val rdd = sc.parallelize(1 to 4000, 4)
        .map(x => (x, List(x, x + 2, x + 1)))
        //      .sortByKey()
        .reduceByKey((a, b) => a.++(b), 8)
    
      val rdd1 = rdd.map(x => (x._1, x._2.head))
        .reduceByKey((a, b) => a + b)
    
      rdd1.count()
    }

 

提交Spark任务的命令如下:

/usr/lib/SPARK-**/bin/spark-submit --driver-java-options "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=11123" trajectory-1.0-SNAPSHOT-jar-with-dependencies.jar

//Executor端调试设置

.set("spark.executor.extraJavaOptions","-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/mnt/ssd1/spark/executor/dump -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=11111")

 

代码的DAG图如下:

从reduceByKey执行过程分析ShuffleReader

 

reduceByKey算子介绍:

    从源码中可以看出,reduceByKey底层调用的是combineByKeyWithClassTag()方法:

从reduceByKey执行过程分析ShuffleReader

 

从reduceByKey执行过程分析ShuffleReader

解释一下上面4个参数:

createCombiner:组合器函数,用于将V类型转成C类型,即同一个Partition中,如果是新的key,调用这个函数进行处理

mergeValue:合并值函数,将一个C类型和一个V类型合并成C类型,即同一个Partition中,key已经遇到过,将当前值和以前的值进行合并

mergeCombiners:合并组合器函数,用于将两个C类型合并成一个C类型,即不同Partition中,同一个key的值调用该函数进行合并

parititioner:分区器,用于指定key reduce到哪一个分区中

 

 

从源码角度分析下ShuffleRead的流程:

    ShuffleRead是从ShuffleRDD.compute()方法开始的,所以调试的时候从这里开始看进去。

“ReduceTask”怎么知道在哪些节点上执行:

这里说的“ReduceTask”可以是ShuffleMapTask,也可以是ResultTask,个人称它为ReduceTask是因为它要拉取上一次ShuffleWrite写到磁盘上的数据,这个过程称之为Reduce(也可以称为ShuffleRead)。

本人是以standalone的方式执行,怎么知道ShuffleRead的任务在哪些节点执行呢?看下ShuffleRDD的getPreferredLocations()方法:

从reduceByKey执行过程分析ShuffleReader

内部是这样的:

从reduceByKey执行过程分析ShuffleReader

下面是源码中对这些参数的解释:

小于2000这里解释说是和HighlyCompressedMapStatus有关,目前还不太明白,但个人猜测是Spark建议你partition的数量不要设置的过多

0.2应该是个经验值,将它调大的话可以集中到更少的节点上,但是如果这些更少的节点都是busy的话,任务的执行会有延迟。

从reduceByKey执行过程分析ShuffleReader

 

执行Shuffle Read操作:

每个“ReduceTask”会拉取属于自己那个Partition的数据:

从reduceByKey执行过程分析ShuffleReader

核心是BlockStoreShuffleReader中的read()方法,接下来重点分析下该方法,该方法的主要流程如下:

  1. 初始化ShuffleBlockFetcherIterator,在初始化的过程中就已经获取到本地和远端的block数据了(详细流程再后面说,因为挺长的…)

  2. 对数据进行加密压缩,然后为获取到的数据分别构造inputStream

从reduceByKey执行过程分析ShuffleReader

    MaxBytesInFlight:单次请求默认最多获取48M数据,可以来自不同节点

    MaxReqsInFlight:单次默认最多发送Int.maxValue个请求

 

3.如果dep.addregator.isDefined,那么就对数据进行agg

    上面分析combineByKeyWithClassTag()是说过算子的作用,这里只是创建了一个ExternalAppendOnlyMap,然后将数据插入到这个map中进行agg。

   ExternalAppendOnlyMap可以看之前分析“SorteShuffleWriter”那一篇文章,里面提到过AppendOnlyMap。这里如果数据太多的话,也会先溢出到磁盘

从reduceByKey执行过程分析ShuffleReader

4.如果指定了dep.keyOrdering,那么就对数据进行排序

    对数据排序使用的是ExternalSorter,ExternalSorter也在那篇分析“SortShuffleWriter”那篇文章里面进行详细分析过。

   虽然之前说过,这里再提一下,这里是调用PartitionedPairBuffer的insert()方法,如果数据太多的话,也会刷新到磁盘生成临时文件。排序是在获取迭代器的时候进行的,使用的是TimSort(优化后的归并排序)。

从reduceByKey执行过程分析ShuffleReader

 

进一步详细分析下获取block的操作:

初始化ShuffleBlockFetcherIterator的过程中会调用它的initialize()方法,获取block数据就是在该方法中完成的:

从reduceByKey执行过程分析ShuffleReader

获取数据时,如果block在本地,则通过本地的BlockManager获取,如果block在远端,则通过ShuffleClient请求远程节点上的BlockTransferService获取。

 

区分远端与本地数据:

    这里会限制每次远程最多可以拉取maxBytesInFlight/5大小的数据(按照默认值就是48M/5=9.6M),注释中也说明了,这样是为了能够从多个节点并行进行拉取。

从reduceByKey执行过程分析ShuffleReader

遍历远端数据的信息,然后拼接拉取数据的请求(FetchRequest):

从reduceByKey执行过程分析ShuffleReader

 

获取远端数据:

    指定IP/Port/ExecutorId然后发送请求获取数据,然后将结果放入result队列,result保存的就是拉取到的数据

从reduceByKey执行过程分析ShuffleReader

 

获取本地数据:

没啥好说的,就是从本地的blockManager里面获取对应的数据,然后加入到result队列中

从reduceByKey执行过程分析ShuffleReader

 

 

 

参考:

    《图解Spark核心技术与案例实战》

 

 

相关文章: