Spark计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:

  ➢ RDD : 弹性分布式数据集

  ➢ 累加器:分布式共享只写变量

  ➢ 广播变量:分布式共享只读变量

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的

集合。    A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable可类比String,它也是不可变的,但是可有很多方法,如切分...

1. RDD的属性

每个属性对应一个方法,

  • getPartitions: Array[Partition]、
  • compute、
  • getDependencies、
  • Partitioner、
  • getPreferredLocations(每个分区对应一个Task,把Task发送到哪个位置记录下来)
* Internally, each RDD is characterized by five main properties:
* 1) - A list of partitions;  
  RDD数据结构中存在分区列表,用于执行任务时并行计算是分布式计算的重要属性,而且分区之间的数据是没有关联的互不影响。
* 2) - A function for computing each split
  Spark在计算时,是使用分区函数对每一个分区进行计算,每个分区中的计算逻辑是一样的,只是数据不一样;
* 3) - A list of dependencies on other RDDs;
  RDD是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个RDD建立依赖关系。
* 4) - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned即Hash分区器)
  分区器,当数据为KV类型时,可以通过设定分区器自定义数据的分区;只有键值对RDD才有分区器
* 5) - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file);
(preferred location)优先位置,(每个Task任务发送到离数据最近的位置--节点的Executor上),如果一个节点的Executor由于内存cpu等原因不能执行,

spark会对它有个降级,给同一个节点的另外一个Executor去执行,它如果还是不能执行就去同一个机架上的其他机器上的Executor(跨节点传输数据了),这又是一个降级;
如果同一个机架上的都不行,则给同一个机房的其他机架上发,又是一个降级;

移动数据不如移动计算;

  分区并行计算 task

2. RDD特点

RDD表示只读的分区的数据集,对RDD进行改动,只能通过RDD的转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息。RDDs之间存在依赖,

RDD的执行是按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化RDD来切断血缘关系。

1)弹性

存储的弹性:内存与磁盘的自动切换;(可以基于内存也可以基于磁盘)

容错的弹性:数据丢失可以自动恢复;(RDD记录了数据怎么计算的,数据丢失了可在上一级自动恢复)

计算的弹性:计算出错重试机制;(Executor挂了,Driver可以转移到其他Executor)

分片的弹性:可根据需要重新分片。(总的数据量不变,分区数是可变的)

 2)分区

RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据。如果RDD是通过已有的文件系统构建,则compute函数是读取(逻辑)指定文件系统中的数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换。

3)只读

  RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD。

由一个RDD转换到另一个RDD,可以通过丰富的操作算子实现,不再像MapReduce那样只能写map和reduce了。

RDD的操作算子包括两类,一类叫做transformations,它是用来将RDD进行转化,构建RDD的血缘关系(懒加载、懒执行,只有遇到action才会真正的执行);另一类叫做actions,它是用来触发RDD的计算,得到RDD的相关计算结果或者将RDD保存的文件系统中。

 4)依赖

RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖。依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应(从上游RDD看)的,(上游的某一个分区被下游的一个或多个分区所使用);

另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多的关系

窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女;

宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition,会引起shuffle,总结:宽依赖我们形象的比喻为超生

 5)缓存

可以缓存到内存也可以缓存到磁盘,缓存没有删除依赖关系;任务执行完之后不管是缓存到内存还是磁盘,它都会被删除掉;

如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。如下图所示,RDD-1经过一系列的转换后得到RDD-n并保存到hdfs,RDD-1在这一过程中会有个中间结果,如果将其缓存到内存,那么在随后的RDD-1转换到RDD-m这一过程中,就不会计算其之前的RDD-0了。

                        ----->R5  
R1--->R2--->R3----->R4 ,RDD3缓存到内存计算1次即可,这样子R5、R6从内存掉即可;缓存默认是没开启的,需要调方法;
                        ----->R6

缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个

Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

例如:

Spark |02 SparkCore| 算子
scala> val rdd = sc.makeRDD(Array("kris"))
scala> val nocache = rdd.map(_.toString + System.currentTimeMillis)
scala> nocache.collect
res0: Array[String] = Array(kris1554979614968)                               

scala> nocache.collect
res1: Array[String] = Array(kris1554979627951)

scala> nocache.collect
res2: Array[String] = Array(kris1554979629257)

scala> val cache = rdd.map(_.toString + System.currentTimeMillis).cache  //将RDD转换为携带当前时间戳并做缓存
cache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at map at <console>:26

scala> cache.collect // 多次打印做了相同的缓存结果
res3: Array[String] = Array(kris1554979702053)

scala> cache.collect
res4: Array[String] = Array(kris1554979702053)
View Code

相关文章:

  • 2021-11-06
  • 2021-06-16
  • 2021-06-10
  • 2021-11-05
  • 2021-11-11
猜你喜欢
  • 2021-07-09
  • 2022-12-23
  • 2021-10-20
  • 2022-12-23
  • 2021-10-07
  • 2022-01-05
  • 2021-06-11
相关资源
相似解决方案