本文只是结合源码对spark的map和reduce过程做简单的总结,不是特别深入。主要是《深入理解Spark__核心思想与源码分析》一书中的内容和自己的一些理解。

1.Shuffle

    不管是hadoop的MapReduce还是spark的map-reduce过程,核心过程都叫做shuffle,MapReduce的shuffle过程,要对每个分区的数据进行排序,然后merge,完成后输出到下游的reduce进行进一步的操作,spark的shuffle过程类似,但是又有一些不同。

    Spark的shuffle过程发展主要分成了三个阶段:第一阶段,每个map阶段的每个分区的数据都要生成N个小文件,分别对应下游的N个reducer,或者说是对应下游reducer要处理的N个分区,这样的话就需要生成好多个小文件,浪费资源和计算性能;在第二阶段,每个map分区可能需要k个core来处理,那么就讲每个core对应的map分区数据放到同一个文件中,这个在一定程度上是减少了文件数量;第三阶段就讲map阶段每个分区的数据先进行排序,和分区划分到一个文件中,也就是每个分区只产生一个文件,供reduce来读取,这里的分区是逻辑上的分区,并不是实际物理上的分区,并且在map的阶段也可以对数据进行聚合和排序(聚合和排序不是必须的,有些算子是不需要聚合排序了)。

    这里的过程说的相对简单,有兴趣的可以自己深入了解。

2. Map端计算结果缓存

     Map计算只是对数的第一步处理,处理结果要缓存起来,以便后面的reducer去读取数据。在整个数据处理的过程中,如果数据量较少,每多一步对数据的处理,就会增加复杂度,反过来数据量较大的时候,如果能够对数据按照一定的方式预处理,那么就反而会较少整体的处理时间,所以为了整体的优化,map端的对数据的缓存也做了不同的预处理方式。

 

Spark计算流程分析(map-reduce)过程

对于byPassMergeSort参数来说,我们知道reduceByKey是在map端进行聚合,sortByKey也是对数据进行了map端的排序,对于类似的算子,这样的操作大大的提高了计算的效率。当然也有不需要进行聚合排序的算子,如果仅进行了聚合和排序反而画蛇添足了。

聚合和排序是分开的,根据上述参数的不同,spark将map端的缓存分了三种方式:

Spark计算流程分析(map-reduce)过程

2.1 map端计算结果缓存聚合

    聚合简单来说就是对key相同的数据对应的value进行操作。如果我们最后得出的结果是聚合的,那么对map端数据进行初步的聚合,就可以大大减少数据量,相当于减少了不同节点间的网络IO。书中解释的很详细,少一句话就不行,所以就把书中所有的都拿上来了,后面加一点自己的理解。

由于有聚合,所以:

Spark计算流程分析(map-reduce)过程Spark计算流程分析(map-reduce)过程Spark计算流程分析(map-reduce)过程

    其实仔细观察上述过程,和Java中的hashMap的实现是有点类似的,都是按照Key取hashcode,然后利用hashcode进行取值。上面的解释有我们要申请一个数据长度2倍的数组,为什么呢?看后面changeValue中的代码我们就可以发现,每个key和其对应的value都是连续存储的([k1,v1,k2,v2,k3,v3……]),所以在找curKey的时候要用data(2*pos),而value就是data(2*pos + 1)。由于数组是引用类型的,所以key和value是完全可以放到同一个数组中的。

    当然这里的数据也和HashMap一样也要面临扩容的问题,就是要申请一个原始容量2倍的数组,然后按照同样的规则将数据重新放到新的数组中,详情可以自己参考书上内容,这里不展开说了。所以提前确定数组的容量特别重要,毕竟复制一次数据到新的数组中代价还是不小的。Spark通过抽样确定数据的原始大小,从而可以估算数据量的大小,说实话,书上的那个采样算法每太看明白,有能看明白的可以指教一下。

2.2 计算结果简单缓存

    简单缓存就是,没有定义聚合,那就直接将数据缓存在数组中。

 

3. 排序和分组

    进行排序和分组的时候,就需要不同额比较器,目前有3中比较器:

Spark计算流程分析(map-reduce)过程

Spark计算流程分析(map-reduce)过程

    这三种比较器也比较简单,不需要做过多的解释,但是什么时候使用哪个比较器呢?看下面代码:

    Spark计算流程分析(map-reduce)过程

    bypassMergeSort是判断是否到reduce端再聚合,所以如果是true则不需要排序,所以传进去的是partitionComparator比较器,即不对元数据进行排序,但是还是对元素进行按照分区排序了。上面的partitionIterator是每个分区的迭代器,所以这里面的代码可以看做是对一个分区内的元素的操作。

    不管是传入什么比较器,都用到了collection.destructiveSortedIterator方法。所以这个方法实现如下:

Spark计算流程分析(map-reduce)过程

Spark计算流程分析(map-reduce)过程

    Map端在对数据进行处理的时候,我们总是会提到分区的问题,即对map端每个分区的数据进行分到后续处理的不同分区,其实在这里真的对数据进行分区了吗,不是的,这里我们只是对数据进行了标记,标记每条数据数据下游的那个分区,也只是逻辑上的分区,但是他们还是在物理上的同一个文件中,这也就是map端聚合排序的真相。

    但是我们如何区分他们属于哪个分区呢?即看上面代码,我们都调用了groupByPartition函数。

Spark计算流程分析(map-reduce)过程

    看代码可以知道通过map对不同partition的数据进行提取,通过data.head._1._1 == partitionId和(elem._1._2,elem._2)两个代码可以看出,原始数据格式为((partitionId, Key),Value)。

    我们在map阶段对数据进行分区,并且按照分区排序,这个都是为下游的reduce阶段准备,但是我们如何能够知道这个分区的规则?所以在每个map端分区之外,我们还要记录每个分区的索引文件,来记录每个分区数据的范围:

Spark计算流程分析(map-reduce)过程

 

4. Reduce端获取数据

    Map端已经对数据进行了一步预处理,已经对数据进行了逻辑分区,而reduce端就需要将逻辑分区转化为具体的物理分区。这个就需要map阶段的索引文件来表示每个reduce端数据分区的范围。

    Reduce端获取数据需要分为从本地获取和从远程获取。不管是从远程还是本地获取,由于上游数据是好多块儿的,所以获取的数据都会先放到一个LinkedBlockingQueue[FetchResult]对象中,每个FetchResult就代表每个Map分区中的一段数据。所以最后对LinkedBlockingQueue[FetchResult]中的多个fetchResult迭代将其整合为一个整体的数据。对于reduce端数据聚合和排序,就还是使用AppendOnlyMap对象进行排序和聚合,和前面说的就差不多了。

5. 最后给出一个总结

Spark计算流程分析(map-reduce)过程Spark计算流程分析(map-reduce)过程Spark计算流程分析(map-reduce)过程

    当然这里是将中间文件刷出到中间文件,也就是落到了磁盘上,不过不管刷出到磁盘还是放到内存,数据的转换思想是不变的,结合上面的3个过程,就可以对整个过程有个大概的感性的认识。

相关文章: