aggregateByKey:

 

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object TransformationsDemo {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().appName("TransformationsDemo").master("local[1]").getOrCreate()
    val sc = sparkSession.sparkContext
    testAggregateByKey(sc)

  }


  private def testAggregateByKey(sc: SparkContext) = {
    var data = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),1)
    def seq(a:Int, b:Int) : Int ={
      println("seq: " + a + "\t " + b)
      math.max(a,b)
    }

    def comb(a:Int, b:Int) : Int ={
      println("comb: " + a + "\t " + b)
      a + b
    }

    data.aggregateByKey(0)(seq, comb).collect.foreach(println)
  }
}

运行结果:

Spark 学习笔记之 aggregateByKey

运行结果分析:  

根据Key值的不同,可以分为3个组:

(1)  (1,3),(1,2),(1,4);

(2)  (2,3);

(3)  (3,6),(3,8)。

这3个组分别进行seqOp,也就是(K,V)里面的V和0进行math.max()运算,运算结果和下一个V继续运算,以第一个组为例,运算过程是这样的:

0, 3 => 3

3, 2 => 3

3, 4 => 4

所以最终结果是(1,4)。

第二组结果是(2,3)。

第三组结果是(3,8)。

combOp是对把各分区的V加起来,由于这里并没有分区,所以实际上是不起作用的。

 

修改下代码,添加分区:

  private def testAggregateByKey(sc: SparkContext) = {
    var data = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
    def seq(a:Int, b:Int) : Int ={
      println("seq: " + a + "\t " + b)
      math.max(a,b)
    }

    def comb(a:Int, b:Int) : Int ={
      println("comb: " + a + "\t " + b)
      a + b
    }

    data.aggregateByKey(0)(seq, comb).collect.foreach(println)
  }

运行结果:

Spark 学习笔记之 aggregateByKey

 

运行结果分析:

根据Key值的不同,可以分为3个区:

(1)  (1,3),(1,2);

(2)  (1,4),(2,3);

(3)  (3,6),(3,8)。

区内先做求最大值

第一组结果是(1,3)。

第二组结果是(1,4),(2,3)。

第三组结果是(3,8)。

combOp是对把各分区的V加起来,由于此次有分区,所以(1,3)和(1,4),做合并操作,结果:(1, 7)。

 

相关文章:

  • 2021-11-28
  • 2020-09-26
  • 2019-06-15
  • 2021-05-06
猜你喜欢
  • 2021-09-28
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-07-28
  • 2021-12-26
  • 2021-08-07
相关资源
相似解决方案