Spark RDD是Spark 核心基石
RDD为什么是Spark的核心概念
Spark建立在统一抽象的RDD之上,使得Spark可以很容易扩展,比如 Spark Streaming、Spark SQL、Machine Learning、Graph都是在spark RDD上面进行的扩展。
RDD的概念
RDD是spark的核心,也是整个spark的架构基础,
RDD是弹性分布式集合(Resilient Distributed Datasets)的简称,
是分布式只读且已分区(多个partition组成)集合对象,Partition分区,
和Block数据块是一一对应的 。
这些集合是弹性的,如果数据集一部分丢失,则可以对它们进行重建。
优点:
RDD只能从持久存储或通过Transformations操作产生,相比于分布式共享内存(DSM)可以更高效实现容错,对于丢失部分数据分区只需根据它的lineage就可重新计算出来,而不需要做特定的Checkpoint
• 1、RDD的不变性,可以实现类Hadoop MapReduce的推测式执行。
• 2、RDD的数据分区特性,可以通过数据的本地性来提高性能,这和Hadoop MapReduce是一样的。
• 3、RDD都是可序列化的,在内存不足时可自劢降级为磁盘存储,把RDD存储于磁盘上,这时性能会有大的下降但不会差于现在的MapReduce。
• 4、批量操作:任务能够根据数据本地性 (data locality) 被分配,从而提高性能。
RDD五个特性
每个 RDD 都包含五部分信息,即数据分区的集合,能根据本地性快速访问到数据的偏好位置,依赖关系,计算方法,是否是哈希 / 范围分区的元数据
分区、最佳位置、依赖、 函数、分区策略
RDD的例子
val file = sc.textFile("hdfs://data/test.txt")
val data = file.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
Spark 程序具体的流程图
1.客户端提交任务,初始化sparkContext之后,sc.textFile(“hdfs://“),去hdfs加载文件
2.加载的文件比如有300MB,在hdfs中就有3个block块,对应 RDD 里面的三个partition
3.加载的这个过程其实就是RDD的创建(MapPartitionsRDD)
4.数据被加载到不同的partition中,通过构建的stage(task set)然后提交到executor中去并行计算
5计算完成后输出结果
Spark计算流程之RDD的作用
可以看出整个计算流程都是基于RDD在做计算,从数据加载,即RDD的创建,中途的计算(stage的划分,RDD的操作,shuffle)。到最后结果的输出,整个计算流程都是由RDD在贯穿
RDD操作
RDD还提供了一组丰富的操作来操作这些数据,这种操作叫做算子。比如map、flatMap、filter、join、groupBy、reduceByKey等
RDD分类,分为创建算子、转换、缓存、执行
Transformation:转换算子,这类转换并不触发提交作业,完成作业中间过程处理。
Action:行动算子,这类算子会触发SparkContext提交Job作业。
RDD接口
RDD的本质特征
RDD之间依赖关系(DAG)
窄依赖
每个父RDD的分区都至多被一个子RDD的分区使用,即为OneToOneDependecies;
窄依赖是指父RDD的每个分区都只被子RDD的一个分区所使用,如map就是一种窄依赖
宽依赖
宽依赖就是指父RDD的分区被多个子RDD的分区所依赖,如join则会导致宽依赖
多个子RDD的分区依赖一个父RDD的分区,即为ShuffleDependency 。例如,map操作是一种窄依赖,而join操作是一种宽依赖(除非父RDD已经基于Hash策略被划分过了,co- partitioned)
stage的划分是Spark作业调度的关键一步,它基于DAG确定依赖关系,借此来划分stage,将依赖链断开,每个stage内部可以并行运行(划分关键点),整个作业按照stage顺序依次执行,最终完成整个Job。
窄依赖具体实现
所有的依赖都要实现trait Dependency[T]
窄依赖是有两种具体实现 OneToOneDependency 和 RangeDependency
宽依赖的实现
宽依赖的实现只有一种:ShuffleDependency
class ShuffleDependency[K, V, C] extends Dependency[Product2[K, V]] { … }
依赖划分好处
窄依赖相比宽依赖更高效资源消耗更少
允许在单个集群节点上流水线式执行,这个节点可以计算所有父级分区。例如,可以逐个元 素地依次执行filter操作和map操作。
例如基于一对一的关系,可以在filter之后执行map。
相反,宽依赖需要所有的父RDD数据可用并且数据已经通过类MapReduce的操作shuffle完 成。
在窄依赖中,节点失败后的恢复更加高效(窄依赖支持更高效的故障还原)。
因为只有丢失的父级分区需要重新计算,并且这些丢失的父级分区可以并行地在不同节点上 重新计算。
与此相反,在宽依赖的继承关系中,单个失败的节点可能导致一个RDD的所有先祖RDD中的 一些分区丢失,导致计算的重新执行。
对于宽依赖,一个结点的故障可能导致来自所有父RDD的分区丢失,因此就需要完全重新执行。因此对于宽依赖,Spark会在持有各个父分区的结点上,将中间数据持久化来简化故障还原
RDD-compute
分区计算
Spark对RDD的计算是以partition为最小单位的,并且都是对迭代器进行复合,不需要保存 每次的计算结果
RDD- partitioner
分区函数:目前spark中提供两种分区函数
HashPatitioner(哈希分区)
RangePatitioner(区域分区)
且partitioner只存在于(K,V)类型的RDD中,rdd本身决定了分区的数量
RDD- lineage
val lines = sc.textFile("hdfs://...")
// transformed RDDs
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split("\t")).map(r => r(1)) messages.cache()
// action 1 messages.filter(_.contains("mysql")).count()
// action 2 messages.filter(_.contains("php")).count()
RDD经过trans或action后产生一个新的RDD,RDD之间的通过lineage来表达依赖关系,
lineage是rdd容错的重要机制,rdd转换后的分区可能在转换前分区的节点内存中
RDD依赖关系的划分(stage)
RDD依赖关系的划分,RDD怎么被划分到一个stage里面?
1. 就是通过窄依赖和宽依赖来划分stage的
2.如果是窄依赖就他们放在一个Stage里面,遇到宽依赖就断开划分为另外一个stage
3.如workCount例子,就划分为了两个stage,在reduceByKey的时候断开了
4.如下图,遇到groupByKey断开,为一个stage1,map、union为窄依赖遇到join断开划分为stage2,其余划分为stage3
典型RDD特征
Spark 中内建的几个 RDD 举例来说
不同角度看RDD
Scheduler Optimizations(调度优化)
调度优化,在同样的Stage阶段内,基于内存进行迭代
Schedule
基于RDD编程(通过DataFrame、DateSet来编程)
1)RDD Objects生成DAG(有向无环图的依赖),图中示例的rdd1和rdd2之间经过join、groupBy等若干lazy级别的transform操作,生成DAG有向无环图,DAG用来描述任务之间的先后关系。
2)DAG作为输入递交给DAGScheduler。DAGScheduler根据DAG描述的任务内部的先后顺序和转换关系,进而被划分出不同的stage,得到了RDD内部的依赖关系,形成TaskSet。
3)TaskSet再由底层调度器TaskScheduler通过cluster Manager来分配任务,把任务交给Worker执行。如果出错了或者慢任务,TaskScheduler需要不断重试。
总结
RDD是Spark架构的基石。