Spark是以Shuffle作为Stage的分界的,Shuffle分为ShuffleWrite和ShuffleRead两个过程。
ShuffleWriter负责将中间结果写到磁盘上,可以理解为是当前Stage结束的收尾操作;最后ShuffleRead用于读取ShuffleWrite生成的Shuffle文件,可以理解为它是下一个Stage开始的那个操作;
之前已经分析过三种ShuffleWrite的写方式了,现在来分析下ShuffleRead是怎么拉取磁盘上的数据的。
个人调试代码说明:
-
def main(args: Array[String]): Unit = { val sc = SparkHolder.getSparkContext() val rdd = sc.parallelize(1 to 4000, 4) .map(x => (x, List(x, x + 2, x + 1))) // .sortByKey() .reduceByKey((a, b) => a.++(b), 8) val rdd1 = rdd.map(x => (x._1, x._2.head)) .reduceByKey((a, b) => a + b) rdd1.count() }
提交Spark任务的命令如下:
/usr/lib/SPARK-**/bin/spark-submit --driver-java-options "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=11123" trajectory-1.0-SNAPSHOT-jar-with-dependencies.jar
//Executor端调试设置
.set("spark.executor.extraJavaOptions","-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/mnt/ssd1/spark/executor/dump -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=11111")
代码的DAG图如下:
reduceByKey算子介绍:
从源码中可以看出,reduceByKey底层调用的是combineByKeyWithClassTag()方法:
解释一下上面4个参数:
createCombiner:组合器函数,用于将V类型转成C类型,即同一个Partition中,如果是新的key,调用这个函数进行处理
mergeValue:合并值函数,将一个C类型和一个V类型合并成C类型,即同一个Partition中,key已经遇到过,将当前值和以前的值进行合并
mergeCombiners:合并组合器函数,用于将两个C类型合并成一个C类型,即不同Partition中,同一个key的值调用该函数进行合并
parititioner:分区器,用于指定key reduce到哪一个分区中
从源码角度分析下ShuffleRead的流程:
ShuffleRead是从ShuffleRDD.compute()方法开始的,所以调试的时候从这里开始看进去。
“ReduceTask”怎么知道在哪些节点上执行:
这里说的“ReduceTask”可以是ShuffleMapTask,也可以是ResultTask,个人称它为ReduceTask是因为它要拉取上一次ShuffleWrite写到磁盘上的数据,这个过程称之为Reduce(也可以称为ShuffleRead)。
本人是以standalone的方式执行,怎么知道ShuffleRead的任务在哪些节点执行呢?看下ShuffleRDD的getPreferredLocations()方法:
内部是这样的:
下面是源码中对这些参数的解释:
小于2000这里解释说是和HighlyCompressedMapStatus有关,目前还不太明白,但个人猜测是Spark建议你partition的数量不要设置的过多。
0.2应该是个经验值,将它调大的话可以集中到更少的节点上,但是如果这些更少的节点都是busy的话,任务的执行会有延迟。
执行Shuffle Read操作:
每个“ReduceTask”会拉取属于自己那个Partition的数据:
核心是BlockStoreShuffleReader中的read()方法,接下来重点分析下该方法,该方法的主要流程如下:
-
初始化ShuffleBlockFetcherIterator,在初始化的过程中就已经获取到本地和远端的block数据了(详细流程再后面说,因为挺长的…)
-
对数据进行加密压缩,然后为获取到的数据分别构造inputStream
MaxBytesInFlight:单次请求默认最多获取48M数据,可以来自不同节点
MaxReqsInFlight:单次默认最多发送Int.maxValue个请求
3.如果dep.addregator.isDefined,那么就对数据进行agg
上面分析combineByKeyWithClassTag()是说过算子的作用,这里只是创建了一个ExternalAppendOnlyMap,然后将数据插入到这个map中进行agg。
ExternalAppendOnlyMap可以看之前分析“SorteShuffleWriter”那一篇文章,里面提到过AppendOnlyMap。这里如果数据太多的话,也会先溢出到磁盘。
4.如果指定了dep.keyOrdering,那么就对数据进行排序
对数据排序使用的是ExternalSorter,ExternalSorter也在那篇分析“SortShuffleWriter”那篇文章里面进行详细分析过。
虽然之前说过,这里再提一下,这里是调用PartitionedPairBuffer的insert()方法,如果数据太多的话,也会刷新到磁盘生成临时文件。排序是在获取迭代器的时候进行的,使用的是TimSort(优化后的归并排序)。
进一步详细分析下获取block的操作:
初始化ShuffleBlockFetcherIterator的过程中会调用它的initialize()方法,获取block数据就是在该方法中完成的:
获取数据时,如果block在本地,则通过本地的BlockManager获取,如果block在远端,则通过ShuffleClient请求远程节点上的BlockTransferService获取。
区分远端与本地数据:
这里会限制每次远程最多可以拉取maxBytesInFlight/5大小的数据(按照默认值就是48M/5=9.6M),注释中也说明了,这样是为了能够从多个节点并行进行拉取。
遍历远端数据的信息,然后拼接拉取数据的请求(FetchRequest):
获取远端数据:
指定IP/Port/ExecutorId然后发送请求获取数据,然后将结果放入result队列,result保存的就是拉取到的数据
获取本地数据:
没啥好说的,就是从本地的blockManager里面获取对应的数据,然后加入到result队列中
参考:
《图解Spark核心技术与案例实战》