groupByKey是Transformation并且产生shuffle
上源码
可以看到调的是一个分区器里面传的是父RDD,参数里面传入一个参数和一个迭代器
传入三个函数,一个分区和一个是否聚合value布尔值
第一个函数装到CompactBuffer,这个不ArrayBuffer更高效,他将迭代器中的第一个value取出来装进CompactBuffer
第二函数是将CompactBuffer的value追加相同key的value
前二步是局部聚合
第三个函数是在全局聚合中每个分区的value进行追加
自己定义GroupByKey
import org.apache.spark.rdd.{RDD, ShuffledRDD} import org.apache.spark.{Aggregator, HashPartitioner, SparkConf, SparkContext} import scala.collection.mutable.ArrayBuffer object MyGroupByKey { def main(args: Array[String]): Unit = { //连接spark val conf = new SparkConf().setAppName("MyGroupByKey").setMaster("local[*]") val sc = new SparkContext(conf) val rdd: RDD[String] = sc.parallelize(List( "hadoop", "fink", "spark", "hive", "hbase", "spark", "spark", "hadoop", "spark", "linux", "kafuka", "ak", "fink", "hive", "hive", "hbase" )) //变成k,v形式对偶元组 val rdd2 = rdd.map((_, 1)) //调取底层shuffledRDD. 参数1 K:String类型 参数2:value int类型 参数3:ArrayBuffer 数组 因为源码中,将第一个参数装入CompactBuffer里,但是CompactBuffer是spark私有的,不能调所有我们用ArrayBuffer val value = new ShuffledRDD[String, Int, ArrayBuffer[Int]](rdd2, new HashPartitioner(rdd2.partitions.length))//newHashPartition分区器 //将第一个value装入ArrayBuffer中 val createCombiner= (x:Int)=>ArrayBuffer(x) //将value和相同key的value相加 val mergeValue=(a:ArrayBuffer[Int],b:Int)=>a+=b //将每个分区的ArrayBuffer的value相加 val mergeCombiners=(q:ArrayBuffer[Int],w:ArrayBuffer[Int])=>q++=w value.setAggregator(new Aggregator[String,Int,ArrayBuffer[Int]]( createCombiner, mergeValue, mergeCombiners )) value.setMapSideCombine(false) value.saveAsTextFile("shffled-group") } }