Spark计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:
➢ RDD : 弹性分布式数据集
➢ 累加器:分布式共享只写变量
➢ 广播变量:分布式共享只读变量
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的
集合。 A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable;可类比String,它也是不可变的,但是可有很多方法,如切分...
1. RDD的属性
每个属性对应一个方法,
- getPartitions: Array[Partition]、
- compute、
- getDependencies、
- Partitioner、
- getPreferredLocations(每个分区对应一个Task,把Task发送到哪个位置记录下来)
* Internally, each RDD is characterized by five main properties: * 1) - A list of partitions;
RDD数据结构中存在分区列表,用于执行任务时并行计算是分布式计算的重要属性,而且分区之间的数据是没有关联的互不影响。* 2) - A function for computing each split;
Spark在计算时,是使用分区函数对每一个分区进行计算,每个分区中的计算逻辑是一样的,只是数据不一样; * 3) - A list of dependencies on other RDDs;
RDD是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个RDD建立依赖关系。 * 4) - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned即Hash分区器)
分区器,当数据为KV类型时,可以通过设定分区器自定义数据的分区;只有键值对RDD才有分区器 * 5) - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file);
(preferred location)优先位置,(每个Task任务发送到离数据最近的位置--节点的Executor上),如果一个节点的Executor由于内存cpu等原因不能执行,
spark会对它有个降级,给同一个节点的另外一个Executor去执行,它如果还是不能执行就去同一个机架上的其他机器上的Executor(跨节点传输数据了),这又是一个降级;
如果同一个机架上的都不行,则给同一个机房的其他机架上发,又是一个降级;
移动数据不如移动计算;
分区并行计算 task
2. RDD特点
RDD表示只读的分区的数据集,对RDD进行改动,只能通过RDD的转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息。RDDs之间存在依赖,
RDD的执行是按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化RDD来切断血缘关系。
1)弹性
存储的弹性:内存与磁盘的自动切换;(可以基于内存也可以基于磁盘)
容错的弹性:数据丢失可以自动恢复;(RDD记录了数据怎么计算的,数据丢失了可在上一级自动恢复)
计算的弹性:计算出错重试机制;(Executor挂了,Driver可以转移到其他Executor)
分片的弹性:可根据需要重新分片。(总的数据量不变,分区数是可变的)
2)分区
RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据。如果RDD是通过已有的文件系统构建,则compute函数是读取(逻辑)指定文件系统中的数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换。
3)只读
RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD。
由一个RDD转换到另一个RDD,可以通过丰富的操作算子实现,不再像MapReduce那样只能写map和reduce了。
RDD的操作算子包括两类,一类叫做transformations,它是用来将RDD进行转化,构建RDD的血缘关系(懒加载、懒执行,只有遇到action才会真正的执行);另一类叫做actions,它是用来触发RDD的计算,得到RDD的相关计算结果或者将RDD保存的文件系统中。
4)依赖
RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖。依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应(从上游RDD看)的,(上游的某一个分区被下游的一个或多个分区所使用);
另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多的关系。
窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女;
宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition,会引起shuffle,总结:宽依赖我们形象的比喻为超生
5)缓存
可以缓存到内存也可以缓存到磁盘,缓存没有删除依赖关系;任务执行完之后不管是缓存到内存还是磁盘,它都会被删除掉;
如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。如下图所示,RDD-1经过一系列的转换后得到RDD-n并保存到hdfs,RDD-1在这一过程中会有个中间结果,如果将其缓存到内存,那么在随后的RDD-1转换到RDD-m这一过程中,就不会计算其之前的RDD-0了。
----->R5
R1--->R2--->R3----->R4 ,RDD3缓存到内存计算1次即可,这样子R5、R6从内存掉即可;缓存默认是没开启的,需要调方法;
----->R6
缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个
Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
例如:
scala> val rdd = sc.makeRDD(Array("kris")) scala> val nocache = rdd.map(_.toString + System.currentTimeMillis) scala> nocache.collect res0: Array[String] = Array(kris1554979614968) scala> nocache.collect res1: Array[String] = Array(kris1554979627951) scala> nocache.collect res2: Array[String] = Array(kris1554979629257) scala> val cache = rdd.map(_.toString + System.currentTimeMillis).cache //将RDD转换为携带当前时间戳并做缓存 cache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at map at <console>:26 scala> cache.collect // 多次打印做了相同的缓存结果 res3: Array[String] = Array(kris1554979702053) scala> cache.collect res4: Array[String] = Array(kris1554979702053)