spark算子介绍

Spark的算子的分类

1、从大方向来说,Spark 算子大致可以分为以下两类:

1)Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理。

Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。

2)Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业。

Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark系统。

2、从小方向来说,Spark 算子大致可以分为以下三类:

1)Value数据类型的Transformation算子,这种变换并不触发提交作业,针对处理的数据项是Value型的数据。
2)Key-Value数据类型的Transfromation算子,这种变换并不触发提交作业,针对处理的数据项是Key-Value型的数据对。
3)Action算子,这类算子会触发SparkContext提交Job作业。

1)Value数据类型的Transformation算子  

  一、输入分区与输出分区一对一型

    1、map算子

    2、flatMap算子

    3、mapPartitions算子

    4、glom算子

  二、输入分区与输出分区多对一型 

    5、union算子

    6、cartesian算子

  三、输入分区与输出分区多对多型

    7、grouBy算子

  四、输出分区为输入分区子集型

    8、filter算子

    9、distinct算子

    10、subtract算子

    11、sample算子

        12、takeSample算子

   五、Cache型

    13、cache算子  

    14、persist算子

 

2)Key-Value数据类型的Transfromation算子

  一、输入分区与输出分区一对一

    15、mapValues算子

  二、对单个RDD或两个RDD聚集

   单个RDD聚集

    16、combineByKey算子

    17、reduceByKey算子

    18、partitionBy算子

   两个RDD聚集

    19、Cogroup算子

  三、连接

    20、join算子

    21、leftOutJoin和 rightOutJoin算子

 3)Action算子

  一、无输出

    22、foreach算子

  二、HDFS

    23、saveAsTextFile算子

    24、saveAsObjectFile算子

  三、Scala集合和数据类型

    25、collect算子

    26、collectAsMap算子

      27、reduceByKeyLocally算子

      28、lookup算子

    29、count算子

    30、top算子

    31、reduce算子

    32、fold算子

    33、aggregate算子

常用Transformation

1、parallelize

#通过并行化scala集合创建RDD
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6))
#查看该rdd的分区数量
rdd1.partitions.length

spark-常用算子

2、sortBy

升序排序
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x,true)

spark-常用算子
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x+"",true)
spark-常用算子

val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x.toString,true) 

spark-常用算子

3、filter 

过滤
val rdd3 = rdd2.filter(_>10)

spark-常用算子 //字典序排序

4、flatMap
//切割压平

val rdd4 = sc.parallelize(Array("a b c","d e f","h i j"))
rdd4.flatMap(_.split(" ")).collect 

spark-常用算子

val rdd5 = sc.parallelize(List(List(“a b c”,“a b b”),List(“e f g”,“a f g”),List(“h i j”,“a a b”)))
rdd5.flatMap(_.flatMap(_.split(” “))).collect

spark-常用算子

5、union

union求并集,注意类型要一致
val rdd6 = sc.parallelize(List(5,6,4,7))
val rdd7 = sc.parallelize(List(1,2,3,4))
val rdd8 = rdd6.union(rdd7)

spark-常用算子

6、distinct

去除相同的元素
rdd8.distinct.sortBy(x=>x).collect

spark-常用算子

7、intersection

intersection求交集
val rdd9 = rdd6.intersection(rdd7)

spark-常用算子

8、join、leftOuterJoin、rigthOuterJoin

val rdd1 = sc.parallelize(List((“tom”,1),(“jerry”,2),(“kitty”,3)))
val rdd2 = sc.parallelize(List((“jerry”,9),(“tom”,8),(“shuke”,7)))
#join
val rdd3 = rdd1.join(rdd2)
val rdd3 = rdd1.leftOuterJoin(rdd2)
val rdd3 = rdd1.rigthOuterJoin(rdd2)

spark-常用算子

spark-常用算子

spark-常用算子

9、groupByKey
val rdd3 = rdd1.union(rdd2)
rdd3.groupByKey
rdd3.groupByKey.map(x=>(x._1,x._2.sum))

spark-常用算子

spark-常用算子

10、cogroup
val rdd1 = sc.parallelize(List((“tom”,1),(“tom”,2),(“jerry”,3),(“kitty”,2)))
val rdd2 = sc.parallelize(List((“jerry”,2),(“tom”,1),(“shuke”,2)))
val rdd3 = rdd1.cogroup(rdd2)
val rdd4 = rdd3.map(t=>(t._1,t._2._1.sum+t._2._2.sum))
spark-常用算子
spark-常用算子

11、cartesian

笛卡尔积
val rdd1 = sc.parallelize(List(“tom”,“jerry”))
val rdd2 = sc.parallelize(List(“tom”,“kitty”,“shuke”))
val rdd3 = rdd1.cartesian(rdd2)
spark-常用算子

二、常用action

1、collect

val rdd1 = sc.parallelize(List(1,2,3,4,5),2)
rdd1.collect
spark-常用算子
2、reduce

val rdd1 = sc.parallelize(List(1,2,3,4,5),2)
val rdd2 = rdd1.reduce(_+_)
spark-常用算子
3、count

val rdd1 = sc.parallelize(List(1,2,3,4,5),2)
rdd1.count
spark-常用算子
4、top

val rdd1 = sc.parallelize(List(1,2,3,4,5),2)
rdd1.top(2)
spark-常用算子
5、take

val rdd1 = sc.parallelize(List(1,2,3,4,5),2)
rdd1.take(2)
spark-常用算子
6、first(与take(1)相似)

val rdd1 = sc.parallelize(List(1,2,3,4,5),2)
rdd1.first
spark-常用算子
7、takeOrdered

val rdd1 = sc.parallelize(List(1,2,3,4,5),2)
rdd1.takeOrdered(3)
spark-常用算子

 高阶算子

1、mapPartitionsWithIndex

把每个partition中的分区号和对应的值拿出来
val func = (index: Int, iter: Iterator[(Int)]) => {
iter.toList.map(x => “[partID:” + index + “, val: " + x + “]”).iterator
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
rdd1.mapPartitionsWithIndex(func).collect
spark-常用算子
2、aggregate
def func1(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => “[partID:” + index + “, val: " + x + “]”).iterator
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
rdd1.mapPartitionsWithIndex(func1).collect
###是action操作, 第一个参数是初始值, 二:是2个函数[每个函数都是2个参数(第一个参数:先对个个分区进行合并, 第二个:对个个分区合并后的结果再进行合并), 输出一个参数]
###0 + (0+1+2+3+4 + 0+5+6+7+8+9)
rdd1.aggregate(0)(_+_, _+_)
rdd1.aggregate(0)(math.max(_, _), _ + _)
###6和1比, 得6再和234比得6 -->6和6789比,得9 --> 6 + (6+9)
rdd1.aggregate(5)(math.max(_, _), _ + _)
spark-常用算子
spark-常用算子
val rdd2 = sc.parallelize(List(“a”,“b”,“c”,“d”,“e”,“f”),2)
def func2(index: Int, iter: Iterator[(String)]) : Iterator[String] = {
iter.toList.map(x => “[partID:” + index + “, val: " + x + “]”).iterator
}
“”+”“abc+”“def->abcdef
rdd2.rdd2.aggregate(””)(_ + _, _ + _)
=+=abc+=def->==abc=def
rdd2.aggregate(”=")(_ + _, _+ _)
spark-常用算子
val rdd3 = sc.parallelize(List(“12”,“23”,“345”,“4567”),2)
“”(0).length和"12"(2).length比较,toString->“2”;“2”(1).length再与"23"(2).length比较->2
“”.(0)length和"345"(3).length比较,toString->“3”;“3”(1).legth再与"4567"(4).length比较->4
因为是并行执行的,结果有可能为24也有可能为42
rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
val rdd4 = sc.parallelize(List(“12”,“23”,“345”,""),2) 结果(10,01)
rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
val rdd5 = sc.parallelize(List(“12”,“23”,"",“345”),2) 结果(11)
rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
spark-常用算子
spark-常用算子
spark-常用算子3、

 

3、aggregateByKey
val pairRDD = sc.parallelize(List( (“cat”,2), (“cat”, 5), (“mouse”, 4),(“cat”, 12), (“dog”, 12), (“mouse”, 2)), 2)
def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => “[partID:” + index + “, val: " + x + “]”).iterator
}
pairRDD.mapPartitionsWithIndex(func2).collect
partID=0(cat)0与2比较->2与5比较->5
(mouse)0与4比较->4
partID=1(cat)0与12比较->12
(dog)0与12比较->12
(mouse)0与2比较->2
(cat,5+12) (mouse,4+2) (dog,12)
pairRDD.aggregateByKey(0)(math.max(_, _), _+ _).collect
pairRDD.aggregateByKey(100)(math.max(_, _), _+ _).collect
spark-常用算子
 

4、combineByKey 

和reduceByKey是相同的效果
###第一个参数x:原封不动取出来, 第二个参数:是函数, 局部运算, 第三个:是函数, 对局部运算后的结果再做运算
###每个分区中每个key中value中的第一个值, (hello,1)(hello,1)(good,1)–>(hello(1,1),good(1))–>x就相当于hello的第一个1, good中的1
val rdd1 = sc.textFile(“hdfs://cdh:9000/SparkWordCount/words.txt”).flatMap(_.split(” “)).map((_, 1))
val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd1.collect
rdd2.collect
spark-常用算子
 

5、countByKey
val rdd1 = sc.parallelize(List((“a”, 1), (“b”, 2), (“b”, 2), (“c”, 2), (“c”, 1)))
rdd1.countByKey
rdd1.countByValue
spark-常用算子
 

6、filterByRange(范围过滤)
val rdd1 = sc.parallelize(List((“e”, 5), (“c”, 3), (“d”, 4), (“c”, 2), (“a”, 1),(“b”, 6))))
val rdd2 = rdd1.filterByRange(“b”, “d”)
rdd2.collect
spark-常用算子
 

7、foldByKey
val rdd1 = sc.parallelize(List(“dog”, “wolf”, “cat”, “bear”), 2)
val rdd2 = rdd1.map(x => (x.length, x))
val rdd3 = rdd2.foldByKey(”")(_+_)
spark-常用算子
 

8、keyBy

以传入的参数做key
val rdd1 = sc.parallelize(List(“dog”, “salmon”, “salmon”, “rat”, “elephant”), 3)
val rdd2 = rdd1.keyBy(_.length)
rdd2.collect
spark-常用算子
9、keys、values
val rdd1 = sc.parallelize(List(“dog”, “tiger”, “lion”, “cat”, “panther”, “eagle”), 2)
val rdd2 = rdd1.map(x => (x.length, x))
rdd2.keys.collect
rdd2.values.collect
spark-常用算子

原文:
https://blog.csdn.net/qq_32595075/article/details/79918644
http://www..com/article/9163165066/ 

相关文章: