Spark 的运行流程
1、Spark 的基本运行流程
1、构建 DAG
使用算子操作 RDD 进行各种 transformation 操作,最后通过 action 操作触发 Spark 作业运行。 提交之后 Spark 会根据转换过程所产生的 RDD 之间的依赖关系构建有向无环图。
2、DAG 切割
DAG 切割主要根据 RDD 的依赖是否为宽依赖来决定切割节点,当遇到宽依赖就将任务划分 为一个新的调度阶段(Stage)。每个 Stage 中包含一个或多个 Task。这些 Task 将形成任务集 (TaskSet),提交给底层调度器进行调度运行。
3、任务调度
每一个 Spark 任务调度器只为一个 SparkContext 实例服务。当任务调度器收到任务集后负责 把任务集以 Task 任务的形式分发至 Worker 节点的 Executor 进程中执行,如果某个任务失败, 任务调度器负责重新分配该任务的计算。
4、执行任务
当 Executor 收到发送过来的任务后,将以多线程(会在启动 executor 的时候就初始化好了 一个线程池)的方式执行任务的计算,每个线程负责一个任务,任务结束后会根据任务的类 型选择相应的返回方式将结果返回给任务调度器。
2、运行流程图解
1、构建 Spark Application 的运行环境(初始化 SparkContext),SparkContext 向资源管理器(可 以是 Standalone、Mesos 或 YARN)注册并申请运行 Executor 资源
2、资源管理器分配 Executor 资源并启动 StandaloneExecutorBackend,Executor 运行情况将 随着心跳发送到资源管理器上
3、SparkContext 构建成 DAG 图,将 DAG 图分解成 Stage,并把 Taskset 发送给 TaskScheduler。 Executor 向 SparkContext 申请 Task,TaskScheduler 将 Task 发放给 Executor 运行同时 SparkContext 将应用程序代码发放给 Executor
4、Task 在 Executor 上运行,运行完毕释放所有资源。
3、SparkContext 初始化
关于 SparkContext:
1、SparkContext 是用户通往 Spark 集群的唯一入口,可以用来在 Spark 集群中创建 RDD、累 加器 Accumulator 和广播变量 Braodcast Variable
2、SparkContext 也是整个 Spark 应用程序中至关重要的一个对象,可以说是整个应用程序 运行调度的核心(不是指资源调度)
3、SparkContext在实例化的过程中会初始化DAGScheduler、TaskScheduler和SchedulerBackend
4、SparkContext 会调用 DAGScheduler 将整个 Job 划分成几个小的阶段(Stage),TaskScheduler 会调度每个 Stage 的任务(Task)应该如何处理。另外,SchedulerBackend 管理整个集群中为这 个当前的应用分配的计算资源(Executor)
初始化流程:
4、Spark 运行架构特点
1、每个 Application 获取专属的 executor 进程,该进程在 Application 期间一直驻留,并以 多线程方式运行 tasks。这种 Application 隔离机制有其优势的,无论是从调度角度看(每个 Driver 调度它自己的任务),还是从运行角度看(来自不同 Application 的 Task 运行在不同的 JVM 中)。当然,这也意味着 Spark Application 不能跨应用程序共享数据,除非将数据写入 到外部存储系统。
2、Spark 与资源管理器无关,只要能够获取 executor 进程,并能保持相互通信就可以了。
3、提交 SparkContext 的 Client 应该靠近 Worker 节点(运行 Executor 的节点),最好是在同 一个 Rack 里,因为 Spark Application 运行过程中 SparkContext 和 Executor 之间有大量的信息 交换;如果想在远程集群中运行,最好使用 RPC 将 SparkContext提交给集群,不要远离 Worker 运行 SparkContext。
4、Task 采用了数据本地性和推测执行的优化机制。
5、DAScheduler
6、TaskScheduler
维护 task 和 executor 对应关系,executor 和物理资源对应关系,在排队的 task 和正在跑的 task。维护内部一个任务队列,根据 FIFO 或 Fair 策略,调度任务。
TaskScheduler 本身是个接口,spark 里只实现了一个 TaskSchedulerImpl,理论上任务调度可 以定制。
主要职能:
7、SchedulerBackend
在 TaskScheduler 下层,用于对接不同的资源管理系统,SchedulerBackend 是个接口,需要实 现的主要方法如下:
粗粒度:进程常驻的模式,典型代表是 Standalone 模式,Mesos 粗粒度模式,YARN
细粒度:Mesos 细粒度模式
这里讨论粗粒度模式,更好理解:CoarseGrainedSchedulerBackend。维护 executor 相关信息(包 括 executor 的地址、通信端口、host、总核数,剩余核数),手头上 executor 有多少被注册 使用了,有多少剩余,总共还有多少核是空的等等。
主要职能:
1、 Driver 端主要通过 actor 监听和处理下面这些事件:
RegisterExecutor(executorId, hostPort, cores, logUrls) 这是 executor 添加的来源,通常 worker 拉起、重启会触发 executor 的注册。 CoarseGrainedSchedulerBackend 把这些 executor 维护起来,更新内部的资源信息,比如总核 数增加。最后调用一次 makeOffer(),即把手头资源丢给 TaskScheduler 去分配一次,返回任 务描述回来,把任务 launch 起来。这个 makeOffer()的调用会出现在任何与资源变化相关的 事件中,下面会看到。
StatusUpdate(executorId, taskId, state, data) task 的状态回调。首先,调用 TaskScheduler.statusUpdate 上报上去。然后,判断这个 task 是 否执行结束了,结束了的话把 executor 上的 freeCore 加回去,调用一次 makeOffer()。
ReviveOffers 这个事件就是别人直接向 SchedulerBackend 请求资源,直接调用 makeOffer()。
KillTask(taskId, executorId, interruptThread) 这个 killTask 的事件,会被发送给 executor 的 actor,executor 会处理 KillTask 这个事件。
StopExecutors 通知每一个 executor,处理 StopExecutor 事件。
RemoveExecutor(executorId, reason) 从维护信息中,那这堆 executor 涉及的资源数减掉,然后调用 TaskScheduler.executorLost() 方法,通知上层我这边有一批资源不能用了,你处理下吧。TaskScheduler 会继续把 executorLost 的事件上报给 DAGScheduler,原因是 DAGScheduler 关心 shuffle 任务的 output location。DAGScheduler 会告诉 BlockManager 这个 executor 不可用了,移走它,然后把所有 的 stage 的 shuffleOutput 信息都遍历一遍,移走这个 executor,并且把更新后的 shuffleOutput 信息注册到 MapOutputTracker 上,最后清理下本地的 CachedLocationsMap。
2、reviveOffers()方法的实现。直接调用了 makeOffers()方法,得到一批可执行的任务描述, 调用 launchTasks。
3、launchTasks(tasks: Seq[Seq[TaskDescription]])方法。 遍历每个 task 描述,序列化成二进制,然后发送给每个对应的 executor 这个任务信息 如果 这个二进制信息太大,超过了9.2M(默认的akkaFrameSize 10M减去默认为akka留空的200K), 会出错,abort 整个 taskSet,并打印提醒增大 akka frame size。如果二进制数据大小可接受, 发送给 executor 的 actor,处理 LaunchTask(serializedTask)事件。
8、Executor
Executor 是 Spark 里的进程模型,可以套用到不同的资源管理系统上,与 SchedulerBackend 配合使用。
内部有个线程池,有个 running tasks map,有个 actor,接收上面提到的由 SchedulerBackend 发来的事件。