Spark之RDD
RDD的概述:
1.1什么是RDD RDD(Ressilient Distributed Dataset)名为弹性(容错)分布式数据集,是一个逻辑上的概念,实际上没有数据,是Spark中最基本的数据抽象。它代表一个不可变,可分区,元素能够用于并行计算的的集合。该类包含所有rdd上可用的基本操作,如:”map”, ”filter”****。
什么是弹性:
\1. RDD可以再内存和磁盘之间进行手动或者自动切换
\2. RDD可以通过转换成为其他的RDD
\3. RDD可以存储任意类型数据
RDD流式计算任务可描述为:从稳定的物理存储(分布式文件系统)中加载记录,记录被传入由一组确定性操作构成的DAG,然后写回稳定存储。另外RDD还可以将数据集缓存到内存中,使得在多个操作之间可以重用数据集,基于这个特点可以很方便地构建迭代型应用或者交互式数据分析应用。可以说Spark最初也就是实现RDD的一个分布式系统,后面通过不断发展壮大成为现在较为完善的大数据生态系统,简单来讲,Spark-RDD的关系类似于Hadoop-MapReduce关系。
KV类型的RDD的byKey方法隐式将rdd转换成PairRDDFunctions.
**RDD具有数据模型的特点:自动容错,位置感知性调度和可伸缩性,**RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。
RDD表示只读的分区的数据集,对RDD进行改动,只能通过RDD的转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息。
1.2 RDD的五大内置属性:
(1).分区列表(A list of Partitions)
分区对象中包含数据的范围
和Hadoop一样的,能进行切分的数据才能并行计算。可以在创建RDD时就进行指定,如果没有指定则会采用默认值,默认值就是CPU Core数目
(2) 计算每个切片的函数(A funtion for computing each split) 算法(算子),算法封装,直接操作分区与java集合相反
每个RDD都会实现Compute函数,comopute函数会对迭代器进行整合,不需要每次保存计算结果
(3),RDD依赖列表(A list of dependencies on other RDDs)
子RDD的每个分区和父RDD分区之间的数量上的对应关系
窄依赖:(NarrowDep)子RDD的每个分区依赖与父RDD的少量分区
One2OneDep
RangeDep
PruneDep
宽依赖:(shuffleDep)
RDD每次转换都会形成一个新的RDD,部分数据丢失时,Spark可以通过依赖关系重新计算丢失的分区数据,而不是重新计算
(4)kv类型的RDD分区器(A Partitioner for key—value RDDs RDD is hash-partitioned)
Spark实现了两种类型的分片函数,一种是基于hash partitioner,另一种是基于范围的RangePartitioner,只有对于key-value的Rdd,才会有Partitioner,非key-value的rdd的值是None,partitioner不但决定了RDD本身的分片数量,也决定了Partent RDD shuffle输出时的分片数量
(5)计算每个切片首选位置列表(a list of preferred locations on compute each split on)
这个位置列表记录了partitions所在的块的位置,按照移动数据不如移动计算的理念,尽可能将计算任务分配到所在的块位置
Task:
Spark中最小执行单位
Rdd每个分区对一个task
Task运行在节点上线程中
在spark中,Task的类型分为2种:ShuffleMapTask和ResultTask;
简单来说,DAG的最后一个阶段会为每个结果的partition生成一个ResultTask,即每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition的数量所决定的!而其余所有阶段都会生成ShuffleMapTask;之所以称之为ShuffleMapTask是因为它需要将自己的计算结果通过shuffle到下一个stage中;也就是说上图中的stage1和stage2相当于mapreduce中的Mapper,而ResultTask所代表的stage3就相当于mapreduce中的reducer。
在之前动手操作了一个wordcount程序,因此可知,Hadoop中MapReduce操作中的Mapper和Reducer在spark中的基本等量算子是map和reduceByKey;不过区别在于:Hadoop中的MapReduce天生就是排序的;而reduceByKey只是根据Key进行reduce,但spark除了这两个算子还有其他的算子;因此从这个意义上来说,Spark比Hadoop的计算算子更为丰富。
1.3 WordCound解Rdd
二 。Spark Rdd中的方法:Transformation(变换)和 Action(触发job提交)
2.1 Transformation(变换)
| 转换 | 含义 |
|---|---|
| map(fnc) | 返回一个新的RDD,该RDD由每一个输入元素经过函数转换后形成 |
| filter(fnc) | 返回一个新的Rdd,该RDD经过fnc计算后返回值为tue的元素构成 |
| flatMap(fnc) | 类似于map,但是每一个输入元素可以被映射为0或这多个输出元素(返回序列) |
| mapPartitions(fnc) | 类似于map,但独立地在Rdd的每一个分片上运行,因此在类型为 T的Rdd上运行是,fnc类型Iterator[T]=>Iterator[U] |
| mapPartitionsWithIndex(fnc) | 类似于mapPartitions,但是fnc带有一个整数参数标识分片的索引值,因此在类型为T的Rdd上运行时,fnc的函数类型(Int,Interarot[T])=>Iterator[U] |
| sample(withReplacement, fraction,seed) | 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数替换,seed用于指定随机数生成器的种子 |
| union(otherDataset) | 对源Rdd和参数Rdd求并集返回一个新的Rdd ,将两个同型集合纵向串联起来,窄依赖 |
| intersection(otherDataset) | 对源Rdd和参数Rdd求交集返回新的Rdd。宽依赖 |
| distinct([numTasks]) | 对源Rdd去重后返回新的Rdd。宽依赖 |
| groupBykey([numTasks]) | 在一个(k v)的Rdd上调用,返回一个新的Rdd(K,Iterator[U ])的Rdd。改变Rdd类型,宽依赖,map端没有聚合 |
| reduceByKey(fnc,[numTasks]) | 在一个(k,v)的Rdd上调用,返回一个(k,v)的Rdd 使用指定的reduce函数,将相同的key的值聚合到一起,reduce任务个数可以通过第二个可选参数设置,改变Rdd类型,宽依赖 map端有聚合 |
| aggregateByKey(zeroValue)(seqOp,combOp,[numTasks]) | 先按照分区聚合,再进行总的聚合,每次都要跟初始值进行交流,例如:aggreByKey(0)(+,+)对k/v的Rdd操作 按key聚合,可以改变V类型,zeroV:U的初值 sepOp分区内聚合函数 combOp:分区间聚合函数 |
| sortByKey([ascending],[numTasks]) | 在一个(k,v)的Rdd上调用,K必须事先Ordered接口,返回一个按照K进行排序的(K,V)Rdd |
| sortBy(fnc,[ascending],[numTasks])) | 与sortByBey类似,更加灵活,第一个参数是根据什么排序, 第二个是怎么排序,false倒序,第三个排序分区数 |
| join(otherDataset,[numTasks]) | 相当于内连接(交集)。在类型为(k,v)和(k,w)的Rdd上调用,返回一个相同key对应的所有元素在一起的(k,(v,w))的Rdd。宽依赖 |
| leftOuterJoin | 左外宽依赖。leftOuterJoin类似于SQL中的左外关联left outer join,返回结果以前面的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。 |
| rightOuterJoin | 右外宽依赖。rightOuterJoin类似于SQL中的有外关联right outer join,返回结果以参数中的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可 |
| fullOuterJoin | 全外宽依赖 |
| cartesian(otherDataset) | 笛卡尔积,交叉连接 |
| repartition(numPartitions) | 重新分区,必须shuffle=true,宽依赖 |
| coalesce(numPartitions) | 重新分区,控制是否进行shuffle处理,减少分区,优化使用该方法第一个参数是要分多少区,第二个参数是否shuffle 默认false 少分区变多分区 true 多分区变少分区 false |
2.2 Action(触发job执行)
| 方法 | 含义 |
|---|---|
| reduce(func) | 通过func函数聚集RDD中的所有元素 |
| collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
| count() | 返回RDD的元素个数 |
| first() | 返回RDD的第一个元素(类似于take(1)) |
| take(n) | 返回一个由数据集的前n个元素组成的数组 |
| foeeach(func) | 在数据集的每一个元素上,运行函数func进行更新。 |
| saveAsTextFile(path) | 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本 |
| takeSample(withReplacement,num, [seed]) | 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子 |
aggregateByKey(zeroValue)(seqOp,combOp,[numTasks]****:方法图解:
三.Spark WordCount编写
使用scala编写
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName(“WcScala”)
//conf.setMaster(“spark://s101/7077”) *//*创建上下文 val sc = new SparkContext(conf)
val rdd1: RDD[String] = sc.textFile(args(0))
*//*压扁 val rdd2: RDD[String] = rdd1.flatMap(line=>line.split(" "))
*//*标一 val rdd3: RDD[(String, Int)] = rdd2.map(word=>(word,1))
*//*化简 def cnt(a:Int,b:Int) = a + b
val rdd4: RDD[(String, Int)] = rdd3.reduceByKey(cnt _)
val arr:Array[(String,Int)] = rdd4.collect();
arr.foreach(e=>println(e))
}
}
使用java编写
package com.oldboy.javaspark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/** *** 使用java实现spark的**word count **/*public class WordCountJava1 {
public static void main(String[] args) {
SparkConf conf = new SparkConf() ;
conf.setMaster(“local”) ;
conf.setAppName(“wcJava”) ;
JavaSparkContext sc = new JavaSparkContext(conf);
*//*加载文件 JavaRDD rdd1 = sc.textFile(“file:///d:/mr/hello10.txt”) ;
*//*压扁 JavaRDD rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() {
public Iterator call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
}) ;
//标1**成对 JavaPairRDD<String, Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s , 1);
}
}) ;
//按key**化简 JavaPairRDD<String, Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}) ;
List<Tuple2<String, Integer>> list= rdd4.collect() ;
for(Tuple2<String,Integer> t : list){
System.out.println(t);
}
}
}
四 宽窄依赖
宽依赖:shuffleDep 窄依赖:narrowDep
图中可知:
窄依赖:是指每个父Rdd的一个partition最多被子Rdd的一个partition所使用,例如:map、filter、unio等都会产生
宽依赖:是指一个父rdd的partition会被多个子Rdd的partition所使用,例如:guoupByKey,reduceByKey,sortBykey等
:如果父RDD的一个Partition被子RDD的一个Partition所使用就是窄依赖,否则的话就是宽依赖。因为是确定的partition数量的依赖关系,所以RDD之间的依赖关系就是窄依赖;由此我们可以得出一个推论:即窄依赖不仅包含一对一的窄依赖,还包含一对固定个数的窄依赖。
一对固定个数的窄依赖的理解:即子RDD的partition对父RDD依赖的Partition的数量不会随着RDD数据规模的改变而改变;换句话说,无论是有100T的数据量还是1P的数据量,在窄依赖中,子RDD所依赖的父RDD的partition的个数是确定的,而宽依赖是shuffle级别的,数据量越大,那么子RDD所依赖的父RDD的个数就越多,从而子RDD所依赖的父RDD的partition的个数也会变得越来越多。