简介
Spark 算子大致可以分为以下两类:
-
Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理。
Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。
-
Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业。
Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark系统。
| Transformation | Actions |
|---|---|
| map(f:T=>U) : RDD[T]=>RDD[U] | count():RDD[T]=>Long |
| filter(f:T=>Bool) : RDD[T]=>RDD[T] | collect():RDD[T]=>Seq[T] |
| flatMap(f:T=>Seq[U]) : RDD[T]=>RDD[U] | reduce(f:(T,T)=>T):RDD[T]=>T |
| sample(fraction:Float) : RDD[T]=>RDD[T] | lookup(k:K):RDD[(K,V)]=>Seq[V] |
| groupByKey() : RDD[(K,V)]=>RDD[(K,Seq[V])] | save(path:String):Ouputs RDD to a storage system,e.g:HDFS |
| reduceByKey(f:(V,V)=>V) : RDD[(K,V)]=>RDD[(K,V)] | |
| union() : (RDD[T],RDD[T])=>RDD[T] | |
| join():(RDD[K,V],RDD[K,W])=>RDD[(K,(V,W))] | |
| cogroup():(RDD[K,V],RDD[K,W])=>RDD[(K,(Seq[V],Seq[W]))] | |
| crossProduct(): (RDD[T],RDD[U])=>RDD[(T,U)] | |
| mapValues(f:V=>W) : RDD[(K,V)]=>RDD[(K,W)] | |
| sort(c:Comparator[K]) : RDD[(K,V)]=>RDD[(K,V)] | |
| partitionBy(p:Partitioner[K]):RDD[(K,V)]=>RDD[(K,V)] |
Transformations 转换算子
Transformations类算子叫做转换算子,如map,flatMap,reduceByKey等。Transformations算子是延迟执行,也叫懒加载执行
-
filter
过滤符合条件的记录数,true保留,false过滤掉。
-
map
将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。
特点:输入一条,输出一条数据。
-
flatMap
先map后flat
与map类似,每个输入项可以映射为0到多个输出项。
-
sample
随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。 -
reduceByKey
将相同的Key根据相应的逻辑进行处理。
-
sortByKey/sortBy
作用在K,V格式的RDD上,对key进行升序或者降序排序。
Action行动算子
Action类算子叫做行动算子,如foreach,collect,count等。Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行
-
count
返回数据集中的元素数。会在结果计算完成后回收到Driver端。
-
take(n)
返回一个包含数据集前n个元素的集合。
-
first
first=take(1),返回数据集中的第一个元素。
-
foreach
循环遍历数据集中的每个元素,运行相应的逻辑。
-
collect
将计算结果回收到Driver端。
控制算子
控制算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition。
cache和persist都是懒执行的。必须有一个action类算子触发执行。
checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。
-
cache
默认将RDD的数据持久化到内存中。cache是懒执行。
注意:chche () = persist()=persist(StorageLevel.Memory_Only)
-
persist:
可以指定持久化的级别。最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2”表示有副本数。
持久化级别如下:
cache和persist的注意事项:
-
cache和persist都是懒执行,必须有一个action类算子触发执行。
-
cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。
-
cache和persist算子后不能立即紧跟action算子。
错误:rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了。
-
-
checkpoint
checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。
checkpoint 的执行原理:
- 当RDD的job执行完毕后,会从finalRDD从后往前回溯。
- 当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。
- Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。
- 优化:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。