spark的shuffle两种实现
在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。
在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。
spark shuffle详解:
spark的shuffle:map端的shuffle称之为shuffle write,在reduce端的shuffle称之为shuffle read
在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。
此次我只介绍新版本的shuffle:sortshuffle
sortshufflemanager的运行机制主要分为两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task 的数量小于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认是200),就会启用bypass机制。
sortshuffle的普通机制:
(1)写入内存数据结构
数据会先写入一个内存数据结构中(默认是5M),此时根据不同的shuffle算子,可能选用不同的数据结构。如果是聚合类操作,选用map数据结构,一边聚合一边写入内存,如果是join,那么就选用Array的数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘(会先写到内存缓冲区),然后清空内存数据结构。
(2)排序
在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。
(3)溢写
对于排序之后的数据 ,会分批写入磁盘文件,数据会以每批一万条写入磁盘文件。写入磁盘文件通过缓冲区溢写的方式,每次溢写都会产生一个磁盘文件,也就是说一个task会产生多个临时文件。
(4)merge归并排序
将所有的临时磁盘文件都进行合并,写入最终的磁盘文件中。再写一份索引文件,标识了下游各个task需要的数据在这个磁盘文件中的start offset和end offset
注意:一个map task会产生一个索引文件和磁盘大文件