-
job :由一个 rdd 的 action 触发的动作,可以简单的理解为,当你需要执行一个 rdd 的 action 的时候,会生成一个 job。Job简单讲就是提交给spark的任务。
-
stage : stage 是一个 job 的组成单位,就是说,一个 job 会被切分成 1 个或 1 个以上的 stage,然后各个 stage 会按照执行顺序依次执行。简单地说Stage是每一个job处理过程要分为的几个阶段。
-
task : 即 stage 下的一个任务执行单元,一般来说,一个 rdd 有多少个 partition,就会有多少个 task,因为每一个 task 只是处理一个 partition 上的数据。Task是任务运行的最小单位。最终是要以task为单位运行在executor中。
stage的task的数量是由输入文件的切片个数来决定的。在HDFS中不大于128m的文件算一个切片(默认128m)。通过算子修改了某一个rdd的分区数量,task数量也会同步修改。
一般情况下,我们一个task运行的时候,使用一个cores,理论上:每一个stage下有多少的分区,就有多少的task,task的数量就是我们任务的最大的并行度。实际上:最大的并行度,取决于我们的application任务运行时使用的executor拥有的cores的数量。task数量超过这个cores的总数先执行cores个数量的task,然后等待cpu资源空闲后,继续执行剩下的task。
RDD,即弹性分布式数据集,全称为Resilient Distributed Dataset,是一个容错的,并行的数据结构,可以让用户显式地 将数据存储到磁盘和内存中,并能控制数据的分区。同时,RDD还提供了一组非常丰富的操作来操作这些数据,如:map,flatMap,filter等转换操作,以及SaveAsTextFile,conutByKey等行动操作。记录了该 RDD 是通过哪些 Transformation 得到的,在计算机中使用 lineage 来表示这种血缘结构,lineage 形成一个有向无环图 DAG, 整个计算过程中,将不需要将中间结果落地到 HDFS 进行容错,加入某个节点出错,则只需要通过 lineage 关系重新计算即可.
PairedRDD:(键值对分布式数据集)Spark 为包含键值对类型的 RDD 提供了一些专有的操作,称为 pair RDD。它们提供了并行操作各个键或跨节点重新进行数据分组 的操作接口。 键值对 RDD 通常用来进行聚合计算。先通过一些初始 ETL(抽取、转 化、装载)操作来将数据转化为键值对形式。键值对 RDD 提供了一些新的操作接口 让用户控制键值对 RDD 在各节点上分布情况的高级特性:分区。 使用可控的分区方式把常被一起访问的数据放到同一个节点上,可以大大减少应用的通信 开销。这会带来明显的性能提升。Spark 为包含键值对类型的 RDD 提供了一些专有的操作,称为 pair RDD。它们提供了并行操作各个键或跨节点重新进行数据分组 的操作接口。 通常从一个 RDD 中提取某些字段(例如代表事件时间、用户 ID 或者其他标识符的字段), 使用这些字段作为 pair RDD 操作中的键。
执行一个Spark Application的main函数和创建Spark Context的进程,它包含了这个application的全部代码。Spark Application中的每个action会被Spark作为Job进行调度。每个Job是一个计算序列的最终结果,而这个序列中能够产生中间结果的计算就是一个stage。一个Job被拆分成若干个Stage,每个Stage执行一些计算,产生一些中间结果。它们的目的是最终生成这个Job的计算结果。而每个Stage是一个task set,包含若干个task。Task是Spark中最小的工作单元,在一个executor上完成一个特定的事情。
Spark集群Shuffle分为两部分:Mapper端和Reducer端。
Mapper端:通过Cache不断的把数据写入到文件系统中并汇报给Driver,Driver需知道把数据写在什么地方。
Reducer端:把相同的Key放在同一个Task中,并进行业务逻辑的操作。Reducer端抓数据的时候也有一个小的缓存区。
Shuffle是MapReduce框架中的必要环节,它是连接Map和Reduce的桥梁。Shuffle只可能产生于值为[k, v]的PairedRDD的操作中,其他RDD是不会产生Shuffle的。当Map的输出结果要被Reduce使用时,输出结果需要按key哈希,并且分发到每一个Reducer上去,这个过程就是shuffle。Shuffle过程涉及到磁盘的读写和网络的传输,因此shuffle性能的高低直接影响到了整个程序的运行效率。正因如此,shuffle是Spark调优,更普遍来说是MapReduce框架调优的关键。Shuffle 不可以避免是因为在分布式系统中的基本点就是把一个很大的的任务/作业分成一百份或者是一千份,这些文件在不同的机器上独自完成各自不同的部份,我们是针对整个作业要结果,所以在后面会进行汇聚,这个汇聚的过程的前一阶段到后一阶段以至网络传输的过程就叫 Shuffle。下一个 Stage 向上一个 Stage 要数据这个过程,我们就称之为 Shuffle,在 Spark 中为了完成 Shuffle 的过程会把真正的一个作业划分为不同的 Stage,这个Stage 的划分是跟据依赖关系去决定的,Shuffle 是整个 Spark 中最消耗性能的一个地方。Spark的RDD可以根据需要被分成多个区,称为partition,这些区被分布在多个Spark节点上,分区之间交换数据只能通过网络通信的方式。当分区的数据排布无法满足需要执行的函数要求时,比如reduceByKey等,数据会进行重新排布,这个过程称为Shuffle。
把相同 key 的元素聚集到同一个 partition 下,所以造成了数据在内存中的重新分布,即 shuffle 操作.shuffle 操作是 spark 中最耗时的操作,应尽量避免不必要的 shuffle.
宽依赖主要有两个过程: shuffle write 和 shuffle fetch. 类似 Hadoop 的 Map 和 Reduce 阶段.shuffle write 将 ShuffleMapTask 任务产生的中间结果缓存到内存中, shuffle fetch 获得 ShuffleMapTask 缓存的中间结果进行 ShuffleReduceTask 计算,这个过程容易造成OutOfMemory.
shuffle 过程内存分配使用 ShuffleMemoryManager 类管理,会针对每个 Task 分配内存,Task 任务完成后通过 Executor 释放空间.这里可以把 Task 理解成不同 key 的数据对应一个 Task. 早期的内存分配机制使用公平分配,即不同 Task 分配的内存是一样的,但是这样容易造成内存需求过多的 Task 的 OutOfMemory, 从而造成多余的 磁盘 IO 过程,影响整体的效率.(例:某一个 key 下的数据明显偏多,但因为大家内存都一样,这一个 key 的数据就容易 OutOfMemory).1.5版以后 Task 共用一个内存池,内存池的大小默认为 JVM 最大运行时内存容量的16%,分配机制如下:假如有 N 个 Task,ShuffleMemoryManager 保证每个 Task 溢出之前至少可以申请到1/2N 内存,且至多申请到1/N,N 为当前活动的 shuffle Task 数,因为N 是一直变化的,所以 manager 会一直追踪 Task 数的变化,重新计算队列中的1/N 和1/2N.但是这样仍然容易造成内存需要多的 Task 任务溢出,所以最近有很多相关的研究是针对 shuffle 过程内存优化的.
Shuffle 时会导政数据分布不均衡,也就是数据倾斜的问题。数据倾斜的问题会引申很多其他问题,比如,网络带宽、各重硬件故障、内存过度消耗、文件掉失。因为 Shuffle 的过程中会产生大量的磁盘 IO、网络 IO、以及压缩、解压缩、序列化和反序列化等等。
每一个MapTask阶段都会产生和下游ReduceTask个数相同的小文件和bucket,这样会导致内存需要存储大量的文件的描述符,针对于海量的小文件到磁盘,涉及大量的IO操作,同时由于JVM内存不足也会导致多次GC;在这些文件的传输过程中,会涉及序列化和反序列化的过程,这是比较消耗时间的,在ReduceTask拉去数据处理时,需要将数据存放在内存内,如果文件量足够大,内存无法存储,可能导致OOM错误。
内存中需要保存海量的文件操作句柄和临时缓存信息,如果数据处理规模比较庞大的话,内存不可承受,出现oom等问题。
如果因为业务需要,一定要使用shuffle操作,无法用map类的算子来替代,那么尽量使用可以map-side预聚合的算子。
所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。通常来说,在可能的情况下,建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。
比如如下两幅图,就是典型的例子,分别基于reduceByKey和groupByKey进行单词计数。其中第一张图是groupByKey的原理图,可以看到,没有进行任何本地聚合时,所有数据都会在集群节点之间传输;第二张图是reduceByKey的原理图,可以看到,每个节点本地的相同key数据,都进行了预聚合,然后才传输到其他节点上进行全局聚合。
宽窄依赖:
宽依赖:父RDD的分区被子RDD的多个分区使用 例如 groupByKey、reduceByKey、sortByKey等操作会产生宽依赖,会产生shuffle
窄依赖:父RDD的每个分区都只被子RDD的一个分区使用 例如map、filter、union等操作会产生窄依赖
spark划分stage的整体思路是:从后往前推,遇到宽依赖就断开,划分为一个stage;遇到窄依赖就将这个RDD加入该stage中。因此在图2中RDD C,RDD D,RDD E,RDDF被构建在一个stage中,RDD A被构建在一个单独的Stage中,而RDD B和RDD G又被构建在同一个stage中。
在spark中,Task的类型分为2种:ShuffleMapTask和ResultTask;简单来说,DAG的最后一个阶段会为每个结果的partition生成一个ResultTask,即每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition的数量所决定的!而其余所有阶段都会生成ShuffleMapTask;之所以称之为ShuffleMapTask是因为它需要将自己的计算结果通过shuffle到下一个stage中;也就是说图2中的stage1和stage2相当于mapreduce中的Mapper,而ResultTask所代表的stage3就相当于mapreduce中的reducer。
AuxiliaryService是NodeManager上的一个Service。AuxliaryService会在收到Application/Container初始化以及停止事件时,会进行相应的处理。
NodeManager中,可以有多个AuxiliaryServices。有一个叫做AuxServices,专门用于处理这些AuxiliaryServices。