1 shuffle介绍
在Hadoop的MapReduce框架中,shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过Shuffle这个环节。由于Shuffle阶段涉及磁盘的读写和网络传输,因此Shuffle的性能高低直接影响到整个程序的性能和吞吐量。Hadoop的MapReduce流程,其中Shuffle阶段是介于Map和Reduce阶段之间。
Shuffle 中文翻译为 “ 洗牌 ” ,需要 Shuffle 的关键性原因是某种具有共同特征的数据需要最终汇聚到一个计算节点上进行计算。把这些分布在不同节点的数据按照一定的规则聚集到一起的过程称为Shuffle。spark作为MapReduce框架的一种实现,也实现了Shuffle的逻辑。
2 shuffle原理
概述:Shuffle描述着数据从map task输出到reduce task输入的这段过程。在分布式情况下,reduce task需要跨节点去拉取其它节点上的map task结果。这一过程将会产生网络资源消耗和内存,磁盘IO的消耗。
3 spark RDD中的shuffle算子
3.1 去重:
def distinct()
def distinct(numPartitions: Int)
3.2 聚合
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
def groupBy[K](f: T => K, p: Partitioner):RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner):RDD[(K, Iterable[V])]
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int): RDD[(K, U)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
3.3 排序
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
3.4 重分区
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null)
3.5集合或者表操作
def intersection(other: RDD[T]): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
4 通过reduceByKey和groupByKey的区别来理解Shuffle
reduceByKey算子:在数据被移动前,同一台机器上同样的key先被计算。然后在进行数据移动,再按照同样的key进行计算。
groupByKey算子:数据被移动后按照同样的key进行计算。
5 参考博客
【深入理解groupByKey、reduceByKey】:https://www.jianshu.com/p/0c6705724cff
【Spark Shuffle原理、Shuffle操作问题解决和参数调优】:https://www.cnblogs.com/arachis/p/Spark_Shuffle.html
【Spark Shuffle详解】:https://blog.csdn.net/snail_gesture/article/details/50803592