groupByKey是Transformation并且产生shuffle

上源码

Spark:GroupByKey算子解刨

可以看到调的是一个分区器里面传的是父RDD,参数里面传入一个参数和一个迭代器

 

Spark:GroupByKey算子解刨

传入三个函数,一个分区和一个是否聚合value布尔值

第一个函数装到CompactBuffer,这个不ArrayBuffer更高效,他将迭代器中的第一个value取出来装进CompactBuffer

第二函数是将CompactBuffer的value追加相同key的value

前二步是局部聚合

第三个函数是在全局聚合中每个分区的value进行追加

自己定义GroupByKey

 

import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.{Aggregator, HashPartitioner, SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

object MyGroupByKey {

  def main(args: Array[String]): Unit = {

//连接spark
    val conf = new SparkConf().setAppName("MyGroupByKey").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[String] = sc.parallelize(List(
      "hadoop", "fink", "spark", "hive",
      "hbase", "spark", "spark", "hadoop",
      "spark", "linux", "kafuka", "ak",
      "fink", "hive", "hive", "hbase"

    ))
//变成k,v形式对偶元组
    val rdd2 = rdd.map((_, 1))
 
   //调取底层shuffledRDD.
参数1 K:String类型
参数2:value  int类型
参数3:ArrayBuffer  数组
因为源码中,将第一个参数装入CompactBuffer里,但是CompactBuffer是spark私有的,不能调所有我们用ArrayBuffer
    val value = new ShuffledRDD[String, Int, ArrayBuffer[Int]](rdd2, new HashPartitioner(rdd2.partitions.length))//newHashPartition分区器
//将第一个value装入ArrayBuffer中
    val  createCombiner=  (x:Int)=>ArrayBuffer(x)
//将value和相同key的value相加
    val mergeValue=(a:ArrayBuffer[Int],b:Int)=>a+=b
//将每个分区的ArrayBuffer的value相加
    val mergeCombiners=(q:ArrayBuffer[Int],w:ArrayBuffer[Int])=>q++=w

    value.setAggregator(new Aggregator[String,Int,ArrayBuffer[Int]](
      createCombiner,
      mergeValue,
      mergeCombiners

    ))




    value.setMapSideCombine(false)
    value.saveAsTextFile("shffled-group")

  }
}

 

 

 

 

相关文章:

  • 2021-12-11
  • 2021-11-02
  • 2022-12-23
  • 2021-12-26
  • 2021-11-28
  • 2021-05-06
  • 2021-11-30
猜你喜欢
  • 2023-02-09
  • 2021-09-29
  • 2021-12-13
  • 2021-12-29
  • 2023-02-06
  • 2022-03-07
  • 2021-10-07
相关资源
相似解决方案