Spark集群分为Master节点和Worker节点,相当于Hadoop的Master和Slave节点。Master节点上常驻Master守护进程,负责管理全部的Worker节点。Worker节点上常驻Worker守护进程,负责与Master节点通信并管理executors。

Driver 官方解释是 The process running the main() function of the application and creating the SparkContext

  • 每个application包含一个driver和多个executors,每个executor里面运行的tasks都属于同一个application。
  • 每个 Worker 上存在一个或者多个ExecutorBackend进程。每个进程包含一个Executor对象,该对象持有一个线程池,每个线程可以执行一个task。
  • 在Standalone版本中,ExecutorBackend被实例化成CoarseGrainedExecutorBackend进程。
  • Worker通过持有ExecutorRunner对象来控制CoarseGrainedExecutorBackend的启停。

多线程模型

  • 每个节点上可以运行一个或多个Executor服务
  • 每个Executor配有一定数量的slot,表示该Executor中可以同时运行多少个ShuffleMapTask或者ReduceTask(就是运行多少task)
  • 每个Executor单独运行在一个JVM进程中,每个Task则是运行在Executor中的一个线程(Hadoop每个Task占据一个进程)
  • 同一个Executor内部的Task可共享内存,比如通过broadcast的文件或者数据结构只会在每个Executor中加载一次,而不会像MapReduce那样,每个Task加载一次;
  • Executor一旦启动后,将一直运行,且它的资源可以一直被Task复用,直到Spark程序运行完成后才释放退出

task的执行速度是跟每个Executor进程的core数量有直接关系的。一个core同一时间只能执行一个线程。而每个Executor进程上分配到的多个task,都是以每个task一个线程的方式并发运行的。如果core数量比较充足,而且分配到的task数量比较合理,那么通常来说,可以比较快速和高效地执行完这些task线程。

总体上看,Spark采用的是经典的scheduler/workers模式,每个Spark应用程序运行的第一步是构建一个可重用的资源池,然后在这个资源池里运行所有的ShuffleMapTask和ReduceTask,而MapReduce应用程序则不同,它不会构建一个可重用的资源池,而是让每个Task动态申请资源,且运行完后马上释放资源。

尽管Spark的多线程模型带来了很多好处,但同样存在不足,主要有:

  • 由于同节点上所有任务运行在一个进程中,因此,会出现严重的资源争用,难以细粒度控制每个任务占用资源。与之相反的是MapReduce,它允许用户单独为Map Task和Reduce Task设置不同的资源,进而细粒度控制任务占用资源量,有利于大作业的正常平稳运行。

多进程模型便于细粒度控制每个任务占用的资源,但会消耗较多的启动时间,不适合运行低延迟类型的作业,这是MapReduce广为诟病的原因之一。而多线程模型则相反,该模型使得Spark很适合运行低延迟类型的作业。总之,Spark同节点上的任务以多线程的方式运行在一个JVM进程中,可带来以下好处:

  • 任务启动速度快,与之相反的是MapReduce Task进程的慢启动速度,通常需要1s左右
  • 同Executor上所有任务运行在一个进程中,有利于共享内存。这非常适合内存密集型任务
  • 同节点上所有任务可运行在一个JVM进程(Executor)中,且Executor所占资源可连续被多批任务使用,不会在运行部分任务后释放掉,这避免 了每个任务重复申请资源带来的时间开销,对于任务数目非常多的应用,可大大降低运行时间。与之对比的是MapReduce中的Task:每个Task单独 申请资源,用完后马上释放,不能被其他任务重用,尽管1.0支持JVM重用在一定程度上弥补了该问题,但2.0尚未支持该功能。

异步并发模型

Spark的高性能一定程度上取决于它采用的异步并发模型,这与Hadoop-2.0是一致的。Hadoop-2.0自己实现了类似Actor的异步并发模型,实现方式是epoll+状态机,而Spark则直接采用了开源软件Akka,该软件实现了Actor模型,性能非常高(目前已经全部换成了netty,并用netty实现了一份与Actor模型类似的机制)。尽管二者在server端采用了一致的并发模型,但在任务级别(特指 Spark任务和MapReduce任务)上却采用了不同的并行机制:Hadoop MapReduce采用了多进程模型,而Spark采用了多线程模型

作业提交

提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动。Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU。而Driver进程要做的第一件事情,就是向集群管理器申请运行Spark作业需要使用的资源,这里的资源指的就是Executor进程。YARN集群管理器会根据我们为Spark作业设置的资源参数,在各个工作节点上,启动一定数量的Executor进程,每个Executor进程都占有一定数量的内存和CPU。

在申请到了作业执行所需的资源之后,Driver进程就会开始调度和执行我们编写的作业代码了。Driver进程会将我们编写的Spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批task,然后将这些task分配到各个Executor进程中执行。task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段),只是每个task处理的数据不同而已。一个stage的所有task都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后Driver就会调度运行下一个stage。下一个stage的task的输入数据就是上一个stage输出的中间结果。如此循环往复,直到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止。

宽/窄依赖

Spark是根据shuffle类算子来进行stage的划分。如果我们的代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。可以大致理解为,shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。因此一个stage刚开始执行的时候,它的每个task可能都会从上一个stage的task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作(比如reduceByKey()算子接收的函数)。这个过程就是shuffle。

stage中会为每个需要计算的partition生成一个task,换句话说也就是每个task处理一个partition,默认情况下一个task对应cpu的一个核。如果一个executor可用cpu核数为8,那么一个executor中最多同是并发执行8个task

Task划分

Spark-概览

遇见宽依赖划分Stage(又提到Hadoop:Hadoop只有map-reduce,每次map-reduce都会落盘)

RDD之间有一系列的依赖关系,依赖关系又分为窄依赖和宽依赖。Spark中的Stage其实就是一组并行的任务,任务是一个个的task。

  • 窄依赖:父RDD和子RDD的partition之间的关系是一对一的。
  • 宽依赖(Shuffled):父RDD与子RDD partition之间的关系是一对多

宽依赖主要有两个过程: shuffle writeshuffle read. 类似Hadoop的Map和Reduce,shuffle write将ShuffleMapTask 任务产生的结果写入到磁盘中,shuffle read获得ShuffleMapTask缓存的中间结果进行ShuffleReduceTask计算,这个过程容易造成OutOfMemory

任务调度

Spark任务会根据RDD之间的依赖关系,形成一个DAG有向无环图,DAG会提交给DAGScheduler,DAGScheduler会把DAG划分相互依赖的多个stage,划分stage的依据就是RDD之间的宽窄依赖。遇到宽依赖就划分stage,每个stage包含一个或多个task任务。然后将这些task以taskSet的形式提交给TaskScheduler运行。stage是由一组并行的task组成。

Shuffle

shuffle过程内存分配使用ShuffleMemoryManager类管理,会针对每个Task分配内存,Task任务完成后通过Executor释放空间。这里可以把Task理解成不同key的数据对应一个Task。早期的内存分配机制使用公平分配,即不同Task分配的内存是一样的,但是这样容易造成内存需求过多的Task的OutOfMemory, 从而造成多余的磁盘IO过程,影响整体的效率。(比如某一个key下的数据明显偏多,但因为大家内存都一样,这一个key的数据就容易OutOfMemory)。1.5版以后 Task 共用一个内存池,分配机制如下:假如有N个Task,ShuffleMemoryManager保证每个Task溢出之前至少可以申请到1/2N内存,且至多申请到1/N,N为当前活动的shuffle Task数,因为N是一直变化的,所以 manager会一直追踪Task数的变化,重新计算队列中的1/N和1/2N.但是这样仍然容易造成内存需要多的Task任务溢出。

运行到每个stage的边界时,数据在shuffle write中写到磁盘上,而在子stage中通过shuffle read去读取数据。这些操作会导致很重的网络以及磁盘的I/O,所以 stage的边界是非常占资源的,在编写Spark程序的时候需要尽量避免的shuffle的产生。

RDD持久化

Spark中一个很重要的能力是将数据持久化(或称为缓存),任意操作都可以访问这些持久化的数据。当持久化一个RDD时,后续使用该RDD,不再需要像使用正常的RDD时要重新计算该RDD。RDD缓存在迭代计算中优势极大,如graphx、mlib

RDD可以使用persist()cache()进行持久化,必须要在trasnformation后使用直接连续调用cache()或者persist()才可以,如果先创建一个rdd,再单独另起一行执行cache()或者persist(),是没有用的,而且会报错,大量的文件会丢失。

数据将会在第一次action操作时进行计算,并缓存在节点的内存中。Spark 的缓存具有容错机制,如果一个缓存的RDD的某个分区丢失了,Spark将按照原来的计算过程,自动重新计算并进行缓存。

在 shuffle 操作中(例如 reduceByKey),即便是用户没有调用 persist 方法,Spark 也会自动缓存部分中间数据。这么做的目的是,在shuffle的过程中某个节点运行失败时,不需要重新计算所有的输入数据。如果用户想多次使用某个RDD,强烈推荐在该RDD上调用persist()方法。

当我们在代码中执行了cache/persist等持久化操作时,根据我们选择的持久化级别的不同,每个task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中。每个持久化的RDD可以使用不同的存储级别进行缓存:

  • 持久化到磁盘
  • 已序列化的Java对象形式持久化到内存(可以节省空间)
  • 以off-heap的方式存储在 Tachyon

以上这些存储级别通过传递一个StorageLevel对象给persist()方法进行设置。

  • MEMORY_ONLY : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,部分数据分区将不再缓存,在每次需要用到这些数据时重新进行计算。这是默认的级别。
  • MEMORY_AND_DISK : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。
  • MEMORY_ONLY_SER : 将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。这种方式会比反序列化对象的方式节省很多空间,尤其是在使用 fast serializer时会节省更多的空间,但是在读取时会增加 CPU 的计算负担。
  • MEMORY_AND_DISK_SER : 类似于 MEMORY_ONLY_SER ,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算。
  • DISK_ONLY : 只在磁盘上缓存 RDD。
  • MEMORY_ONLY_2,MEMORY_AND_DISK_2,等等 : 与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。
  • OFF_HEAP(实验中): 类似于 MEMORY_ONLY_SER ,但是将数据存储在 off-heap memory,这需要启动 off-heap 内存。

如何选择存储级别,Spark 的存储级别的选择,核心问题是在内存使用率和 CPU 效率之间进行权衡。建议按下面的过程进行存储级别的选择

  • 如果使用默认的存储级别(MEMORY_ONLY),存储在内存中的 RDD 没有发生溢出,那么就选择默认的存储级别。默认存储级别可以最大程度的提高 CPU 的效率,可以使在 RDD 上的操作以最快的速度运行。
  • 如果内存不能全部存储 RDD,那么使用 MEMORY_ONLY_SER,并挑选一个快速序列化库将对象序列化,以节省内存空间。使用这种存储级别,计算速度仍然很快。
  • 除了在计算该数据集的代价特别高,或者在需要过滤大量数据的情况下,尽量不要将溢出的数据存储到磁盘。因为,重新计算这个数据分区的耗时与从磁盘读取这些数据的耗时差不多。
  • 如果想快速还原故障,建议使用多副本存储级别(例如,使用 Spark 作为 web 应用的后台服务,在服务出故障时需要快速恢复的场景下)。所有的存储级别都通过重新计算丢失的数据的方式,提供了完全容错机制。但是多副本级别在发生数据丢失时,不需要重新计算对应的数据库,可以让任务继续运行。

Spark自动监控各个节点上的缓存使用率,并以最近最少使用的方式LRU将旧数据块移除内存。如果想手动移除一个RDD,而不是等待该RDD被Spark自动移除,可以使用unpersist()方法

相关文章: