本例以wc 为例子 (akka 为rpc 通讯框架)

1、我们进入savaAsTextFile 这个 action算子
spark 源码分析(spark 触发Action 之后操作)

2、上面算子通过多层封装(其中包换数据 转换和创建一些读写流) 最后通过 runjob 方法的入口开始提交任务(其中包含DAG 任务的划分流程等将DAG 切分成多个stage 然后将stage 切分成不同task 任务最后提交任务)
spark 源码分析(spark 触发Action 之后操作)

经过一系列的变换操作(通常增加一些提交任务逻辑的一些环境和变量等)调用dagSchedule 的 runjob 方法
spark 源码分析(spark 触发Action 之后操作)

最后将任务放在DAGSchedulerEventProcessLoop的阻塞队列中(等前面初始化好的方法调用这个方法)
spark 源码分析(spark 触发Action 之后操作)

注意:其实 spark 中 初始化sparkContext 时候下的 new DAGScheduler(this) 事已经启动了一个线程一值等待(一旦DAGSchedulerEventProcessLoop中有了相应的任务之后 然后才进行走自己的切分 stage等逻辑)

具体流程参考: newsparkContext -> new DAGScheduler(this) -> eventProcessLoop.start() 方法

spark 源码分析(spark 触发Action 之后操作)

spark 源码分析(spark 触发Action 之后操作)
spark 源码分析(spark 触发Action 之后操作)

由上面可知提交任务类型匹配JobSubmitted 方法
spark 源码分析(spark 触发Action 之后操作)

然后接下来就是 DAG 切分逻辑和真正的任务提交逻辑(具体DAG的切分并且提交任务逻辑如下)

通过最后罗杰rdd 推算出最后的 stage ()

spark 源码分析(spark 触发Action 之后操作)

其中 :根据rdd 类型进行切分
spark 源码分析(spark 触发Action 之后操作)

通过回溯的方式拿到逻辑的的一个stage

spark 源码分析(spark 触发Action 之后操作)

然后区分完是否集群模式之后 提交DAG 切分完成后的第一个stage

spark 源码分析(spark 触发Action 之后操作)

判断为是第一个 stage 之后 将stage 提交的 taskScheduler
spark 源码分析(spark 触发Action 之后操作)

判断完了 stage 结果的方式之后先将任务中环境等信息初始好后将任务分发到各个 excuter 中(上文中 excuter 已经建好了线程池 只要一有任务进来就启动excutor 中线程)其中 locs 方法返回服务器和分区之间的关系(逻辑移动到数据所在机器 用到)

注意:excutor 中已经启动好线程逻辑 参考:https://blog.csdn.net/weixin_40809627/article/details/103400396
spark 源码分析(spark 触发Action 之后操作)

利用timer定时器定时将任务提交到excutor 中 具体逻辑 待分析(todo )
spark 源码分析(spark 触发Action 之后操作)

相关文章:

  • 2021-12-28
  • 2021-05-16
  • 2021-10-15
  • 2021-10-12
  • 2021-09-01
  • 2021-12-18
  • 2021-08-11
  • 2021-09-29
猜你喜欢
  • 2021-08-23
  • 2021-06-24
  • 2021-12-18
  • 2021-12-18
  • 2021-12-18
  • 2021-05-01
  • 2021-08-12
相关资源
相似解决方案