SparkContext解析

SparkContext是用户通往Spark集群的唯一入口,任何需要使用Spark的地方都需要先创建SparkContext,那么SparkContext做了什么?

首先SparkContext是在Driver程序里面启动的,可以看做Driver程序和Spark集群的一个连接,SparkContext在初始化的时候,创建了很多对象:

Spark----SparkContext解析

上图列出了SparkContext在初始化创建的时候的一些主要组件的构建。

SparkContext创建过程

创建过程如下:

Spark----SparkContext解析

SparkContext在新建时

  1. 内部创建一个SparkEnv,SparkEnv内部创建一个RpcEnv
    1. RpcEnv内部创建并注册一个MapOutputTrackerMasterEndpoint(该Endpoint暂不介绍)
  2. 接着创建DAGScheduler,TaskSchedulerImpl,SchedulerBackend
    1. TaskSchedulerImpl创建时创建SchedulableBuilder,SchedulableBuilder根据类型分为FIFOSchedulableBuilder,FairSchedulableBuilder两类
  3. 最后启动TaskSchedulerImpl,TaskSchedulerImpl启动SchedulerBackend
    1. SchedulerBackend启动时创建ApplicationDescription,DriverEndpoint, StandloneAppClient
    2. StandloneAppClient内部包括一个ClientEndpoint

SparkContext简易结构与交互关系

Spark----SparkContext解析

  1. SparkContext:是用户Spark执行任务的上下文,用户程序内部使用Spark提供的Api直接或间接创建一个SparkContext
  2. SparkEnv:用户执行的环境信息,包括通信相关的端点
  3. RpcEnv:SparkContext中远程通信环境
  4. ApplicationDescription:应用程序描述信息,主要包含appName, maxCores, memoryPerExecutorMB, coresPerExecutor, Command(
    CoarseGrainedExecutorBackend),  appUiUrl等
  5. ClientEndpoint:客户端端点,启动后向Master发起注册RegisterApplication请求
  6. Master:接受RegisterApplication请求后,进行Worker资源分配,并向分配的资源发起LaunchExecutor指令
  7. Worker:接受LaunchExecutor指令后,运行ExecutorRunner
  8. ExecutorRunner:运行applicationDescription的Command命令,最终Executor,同时向DriverEndpoint注册Executor信息

Master对Application资源分配

当Master接受Driver的RegisterApplication请求后,放入waitingDrivers队列中,在同一调度中进行资源分配,分配过程如下:

Spark----SparkContext解析

waitingApps与aliveWorkers进行资源匹配

  1. 如果waitingApp配置了app.desc.coresPerExecutor:
    1. 轮询所有有效可分配的worker,每次分配一个executor,executor的核数为minCoresPerExecutor(app.desc.coresPerExecutor),直到不存在有效可分配资源或者app依赖的资源已全部被分配
  2. 如果waitingApp没有配置app.desc.coresPerExecutor:
    1. 轮询所有有效可分配的worker,每个worker分配一个executor,executor的核数为从minCoresPerExecutor(为固定值1)开始递增,直到不存在有效可分配资源或者app依赖的资源已全部被分配
  3. 其中有效可分配worker定义为满足一次资源分配的worker:
    1. cores满足:usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor,
    2. memory满足(如果是新的Executor):usableWorkers(pos).memoryFree - assignedExecutors(pos) * memoryPerExecutor >= memoryPerExecutor

注意:Master针对于applicationInfo进行资源分配时,只有存在有效可用的资源就直接分配,而分配剩余的app.coresLeft则等下一次再进行分配

Worker创建Executor

Spark----SparkContext解析

(图解:橙色组件是Endpoint组件)

 Worker启动Executor

  1. 在Worker的tempDir下面创建application以及executor的目录,并chmod700操作权限
  2. 创建并启动ExecutorRunner进行Executor的创建
  3. 向master发送Executor的状态情况

 ExecutorRnner

  1. 新线程【ExecutorRunner for [executorId]】读取ApplicationDescription将其中Command转化为本地的Command命令
  2. 调用Command并将日志输出至executor目录下的stdout,stderr日志文件中,Command对应的java类为CoarseGrainedExecutorBackend

 CoarseGrainedExecutorBackend

  1. 创建一个SparkEnv,创建ExecutorEndpoint(CoarseGrainedExecutorBackend),以及WorkerWatcher
  2. ExecutorEndpoint创建并启动后,向DriverEndpoint发送RegisterExecutor请求并等待返回
  3. DriverEndpoint处理RegisterExecutor请求,返回ExecutorEndpointRegister的结果
  4. 如果注册成功,ExecutorEndpoint内部再创建Executor的处理对象

至此,Spark运行任务的容器框架就搭建完成。

相关文章: