1. Spark架构及生态

通常当需要处理的数据量超过了单机尺寸(比如我们的计算机有4GB的内存,而我们需要处理100GB以上的数据)这时我们可以选择spark集群进行计算,有时我们可能需要处理的数据量并不大,但是计算很复杂,需要大量的时间,这是我们也可以选择利用spark集群强大的计算资源,并行化地计算,其架构示意图如下:

Spark架构和工作原理、RDD依赖关系、DAG、stage详解

Spark Core:包含Spark的基本功能;尤其是定义RDD的API、操作以及这两者上的动作。其他Spark的库都是构建在RDD和Spark Core之上的。
Spark SQL:提供通过Apache Hive的SQL变体Hive查询语言(HiveQL)与Spark进行交互的API。每个数据库表被当做一个RDD,Spark SQL查询被转换为Spark操作。
Spark Streaming:对实时数据流进行处理和控制。Spark Streaming允许程序能够像普通RDD一样处理实时数据
MLib:一个常用机器学习的算法库,算法被实现为对RDD的Spark操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作
GraphX:控制图、并行图操作和计算的一组算法和工具的集合。GraphX扩展了RDD API,包含控制图、创建子图、访问路径上所有顶点的操作

Spark架构采用了分布式计算中的Master-Slave模型,Master是对应集群中的含有Master进程的节点,Slave是集群中含有Worker进程的节点。Master作为整个集群的控制器,负责整个集群的正常运行;Worker相当于是计算节点,接收主节点命令与进行状态汇报;Executor负责任务的执行;Client作为用户的客户端负责提交应用,Driver负责控制一个应用的执行,组成图如下:

Spark架构和工作原理、RDD依赖关系、DAG、stage详解

Spark集群部署后,需要在主节点和从节点分别启动Master进程和Worker进程,对整个集群进行控制。在一个Spark应用的执行过程中,Driver和Worker是两个重要角色。

Driver程序是应用逻辑执行的起点,负责作业的调度,即Task任务的分发,而多个Worker用来管理计算节点和创建Executor并行处理任务。在执行阶段,Driver会将Task和Task所依赖的file和jar序列化后传递给对应的Worker机器,同时Executor对相应数据分区的任务进行处理。

Spark的架构中的基本组件:

1、Cluster Manager:在standalone模式中即为Master主节点,控制整个集群,监控worker。在YARN模式中为资源管理器。
2、Worker:从节点,负责控制计算节点,启动Executor或者Driver。在YARN模式中为NodeManager,负责计算节点的控制。
3、Driver:运行Application的main()函数并创建SparkContext。
4、Executor:执行器,在worker node上执行任务的组件、用于启动线程池运行任务。每个Application拥有独立的一组Executor。
5、SparkContext:整个应用的上下文,控制应用的生命周期。
6、RDD:Spark的基础计算单元,一组RDD可形成执行的有向无环图RDD Graph。
7、DAG Scheduler:根据作业(task)构建基于Stage的DAG,并提交Stage给TaskScheduler。
8、TaskScheduler:将任务(task)分发给Executor执行。
9、SparkEnv:线程级别的上下文, 存储运行时的重要组件的引用。

2. Spark与Hadoop

1、Hadoop有两个核心模块,分布式存储模块HDFS和分布式计算模块MapReduce
Spark本身并没有提供分布式文件系统,因此Spark的分析大多依赖于Hadoop的分布式文件系统HDFS

2、Hadoop的MapReduce与Spark都可以进行数据计算,而相比于MapReduce,Spark的速度更快并且提供的功能更加丰富

关系如下图:

Spark架构和工作原理、RDD依赖关系、DAG、stage详解

3. Spark工作原理

3.1 运行流程及特点

spark运行流程图如下:

Spark架构和工作原理、RDD依赖关系、DAG、stage详解

1、构建Spark Application的运行环境,启动SparkContext
2、SparkContext向资源管理器(可以是Standalone,Mesos,Yarn)申请运行3、Executor资源,并启动StandaloneExecutorbackend
4、Executor向SparkContext申请Task
5、SparkContext将应用程序分发给Executor
6、SparkContext构建成DAG图,将DAG图分解成Stage、将Taskset发送给Task 7、Scheduler,最后由Task Scheduler将Task发送给Executor运行
8、Task在Executor上运行,运行完释放所有资源

3.2 Spark运行特点

1、每个Application获取专属的executor进程,该进程在Application期间一直驻留,并以多线程方式运行Task。这种Application隔离机制是有优势的,无论是从调度角度看(每个Driver调度它自己的任务),还是从运行角度看(来自不同Application的Task运行在不同JVM中),当然这样意味着Spark Application不能跨应用程序共享数据,除非将数据写入外部存储系统。

2、Spark与资源管理器无关,只要能够获取Executor进程,并能保持互相通信就可以了。

3、提交SparkContext的Client应该靠近Worker节点(运行Executor的节点),最好是在同一个Rack里,因为Spark Application运行过程中SparkContext和Executor之间有大量的信息互换。

4、Task采用了数据本地性和推测执行的优化机制。

3.3 常用术语

Application:Application都是指用户编写的Spark应用程序,其中包括一个Driver功能的代码和分布在集群中多个节点上运行的Executor代码

Driver:Spark中的Driver即运行上述Application的main函数并创建SparkContext,创建SparkContext的目的是为了准备Spark应用程序的运行环境,在Spark中有SparkContext负责与ClusterManager通信,进行资源申请、任务的分配和监控等,当Executor部分运行完毕后,Driver同时负责将SparkContext关闭,通常用SparkContext代表Driver

Executor:某个Application运行在worker节点上的一个进程,该进程负责运行某些Task,并且负责将数据存到内存或磁盘上,每个Application都有各自独立的一批Executor,在Spark on Yarn模式下,其进程名称为CoarseGrainedExecutor Backend。一个CoarseGrainedExecutor Backend有且仅有一个Executor对象,负责将Task包装成taskRunner,并从线程池中抽取一个空闲线程运行Task,这个每一个CoarseGrainedExecutor Backend能并行运行Task的数量取决于分配给它的cup个数

Cluster Manager:指的是在集群上获取资源的外部服务。目前有三种类型:

1、standalone:spaark原生的资源管理,由Master负责资源的分配
2、Apache Mesos:与hadoop MR兼容性良好的一种资源调度框架
3、Hadoop Yarn:主要指Yarn中的ResourceManager

Worker:集群中任何可以运行Application代码的节点,在Standalone模式中指的是通过slave文件配置的Worker节点,在Spark on Yarn模式下就是NoteManager节点

Task:被送到某个Executor上的工作单元,但HadoopMR中的MapTask和ReduceTask概念一样,是运行Application的基本单位,多个Task组成一个Stage,而Task的调度和管理等是由TaskScheduler负责

Job:包含多个Task组成的并行计算,往往由Spark Action触发生成,一个Application中往往会产生多个Job

Stage:每个Job会被拆分成多组Task,作为一个TaskSet,其名称为Stage,Stage的划分和调度是有DAGScheduler来负责的,Stage有非最终的Stage(Shuffle Map Stage)和最终的Stage(Result Stage)两种,Stage的边界就是发生Shuffle的地方

DAGScheduler:根据Job构建基于Stage的DAG(Directed Acyclic Graph有向无环图),并提交Stage给TASKScheduler。其划分Stage的根据是RDD之间的依赖的关系找出开销最小的调度方法,如下图

Spark架构和工作原理、RDD依赖关系、DAG、stage详解

TASKScheduler:将TaskSET提交给Worker运行,每个Executor运行什么Task就是在此处分配的,TaskScheduler维护所有TaskSet,当Executor向Driver发生心跳时,TaskScheduler会根据资源剩余情况分配相应的Task。另外TaskScheduler还维护着所有Task的运行标签,重试失败的Task,下图展示了TaskScheduler的作用:

Spark架构和工作原理、RDD依赖关系、DAG、stage详解

在不同运行模式中任务调度器具体为:

1、Spark on Standalone模式为TaskScheduler
2、YARN-Client模式为YarnClientChusterScheduler
3、YARN-Chuster模式为YarnClusterScheduler

将这些术语串起来的运行层次图如下:

Spark架构和工作原理、RDD依赖关系、DAG、stage详解

Job=多个Stage,Stage-多个同种task,Task分为ShuffleMapTask和ResultTask,Dependency分为ShuffleDependency和NarrowDependency

3.4 Spark运行模式

Spark的运行模式多种多样,灵活多变,部署在单机上时,既可以用本地模式运行,也可以用伪分布模式运行,而当以分布式集群方式部署时,也有众多的运行模式可供选择,这取决于集群的实际情况,底层的资源调度即可以依赖外部资源调度框架,也可以使用Spark内建的Standalone模式。

对于外部资源调度框架的支持,目前的实现包括相对稳定的Mesos模式,以及hadoop YARN模式。

本地模式:常用于本地开发测试,本地还分为local和local cluster

3.4.1 standalone:独立集群运行模式

Standalone模式使用Spark自带的资源调度框架

采用Master/Slaver的典型架构,选用ZooKeeper来实现Master的HA

Spark架构和工作原理、RDD依赖关系、DAG、stage详解

该模式主要的节点有Client节点、Master节点和Worker节点。其中Driver既可以运行在Master节点上,也可以运行在本地Client端。当用spark-shell交互工具提交Spark的Job时,Driver在Master节点上运行;

当使用spark-submit工具提交Job或者在Eclip、IDEA等开发平台上使用“new SparkConf.setManager(“spark://master:7077”)”方式运行Spark任务时,Driver是运行在本地Client端上的。

运行过程如下图:(参考至:http://blog.csdn.net/gamer_gyt/article/details/51833681)

Spark架构和工作原理、RDD依赖关系、DAG、stage详解

1、SparkContext连接到Master,向Master注册并申请资源(CPU Core和Memory)

2、Master根据SparkContext的资源申请要求和Worker心跳周期内报告的信息决定在哪个Worker上分配资源,然后在该Worker上获取资源,然后启动StandaloneExecutorBackend;

3、StandaloneExecutorBackend向SparkContext注册;

4、SparkContext将Application代码发送给StandaloneExecutorBackend;并且SaprkContext解析Application代码,构建DAG图,并提交给DAG Scheduler分解成Stage(当碰到Action操作时,就会催生Job;每个Job中含有1个或多个Stage,Stage一般在获取外部数据和shuffle之间产生),然后以Stage(或者称为TaskSet)提交给Task Scheduler,Task Scheduler负责将Task分配到相应的Worker,最后提交给StandaloneExecutorBackend执行;

5、StandaloneExecutorBackend会建立Executor线程池,开始执行Task,并向SparkContext报告,直至Task完成

6、所有Task完成后,SparkContext向Maxter注销,释放资源

3.5 RDD运行流程

RDD在Spark中运行大概分为以下三步:

1、创建RDD对象

2、DAGScheduler模块介入运算,计算RDD之间的依赖关系,RDD之间的依赖关系就形成了DAG

3、每一个Job被分为多个Stage。划分Stage的一个主要依据是当前计算因子的输入是否是确定的,如果是则将其分在同一个Stage,避免多个Stage之间的消息传递开销

示例图如下:

Spark架构和工作原理、RDD依赖关系、DAG、stage详解

以下面一个按A-Z首字母分类,查找相同首字母下不同姓名总个数的例子来看一下RDD是如何运行起来的

Spark架构和工作原理、RDD依赖关系、DAG、stage详解

创建RDD上面的例子除去最后一个collect是个动作,不会创建RDD之外,前面四个转换都会创建新的RDD。因此第一步就是创建好所有RDD(内部的五项信息)?

创建执行计划Spark会尽可能的管道化,并基于是否要重新组织数据来划分阶段(stage),例如本例中的groupBy()转换就会将整个执行计划分成两个阶段执行。最终会产生一个DAG(directed Acyclic graph,有向无环图)作为逻辑执行计划

Spark架构和工作原理、RDD依赖关系、DAG、stage详解

调度任务 将各个阶段分成不同的任务(task),每个任务都是数据和计算的合体。在进行下一阶段前,当前阶段的所有任务都要执行完成。因为下一阶段的第一个转换一定是重新组织数据的,所有必须等当前阶段所有结果数据都计算出来了才能继续。

4. RDD依赖关系

RDD依赖关系,也就是有依赖的RDD之间的关系,比如RDD1------->RDD2(RDD1生成RDD2),RDD2依赖于RDD1。这里的生成也就是RDDtransformation操作

窄依赖(也叫narrow依赖)

从父RDD角度看:一个父RDD只被一个子RDD分区使用。父RDD的每个分区最多只能被一个Child RDD的一个分区使用

从子RDD角度看:依赖上级RDD的部分分区 精确知道依赖的上级RDD分区,会选择和自己在同一节点的上级RDD分区,没有网络IO开销,高效。如map,flatmap,filter

宽依赖(也叫shuffle依赖/wide依赖)

从父RDD角度看:一个父RDD被多个子RDD分区使用。父RDD的每个分区可以被多个Child RDD分区依赖

从子RDD角度看:依赖上级RDD的所有分区 无法精确定位依赖的上级RDD分区,相当于依赖所有分区(例如reduceByKey) 计算就涉及到节点间网络传输

Spark架构和工作原理、RDD依赖关系、DAG、stage详解

Spark之所以将依赖分为narrow和 shuffle:

(1) narrow dependencies可以支持在同一个集群Executor上,以pipeline管道形式顺序执行多条命令,例如在执行了map后,紧接着执行filter。分区内的计算收敛,不需要依赖所有分区的数据,可以并行地在不同节点进行计算。所以它的失败恢复也更有效,因为它只需要重新计算丢失的parent partition即可,

(2)shuffle dependencies 则需要所有的父分区都是可用的,必须等RDD的parent partition数据全部ready之后才能开始计算,可能还需要调用类似MapReduce之类的操作进行跨节点传递。从失败恢复的角度看,shuffle dependencies 牵涉RDD各级的多个parent partition。

Spark架构和工作原理、RDD依赖关系、DAG、stage详解

5. 划分stage

由于shuffle依赖必须等RDD的parent RDD partition数据全部ready之后才能开始计算,因此spark的设计是让parent RDD将结果写在本地,完全写完之后,通知后面的RDD。后面的RDD则首先去读之前的本地数据作为input,然后进行运算。

由于上述特性,将shuffle依赖就必须分为两个阶段(stage)去做:

第一个阶段(stage)需要把结果shuffle到本地,例如reduceByKey,首先要聚合某个key的所有记录,才能进行下一步的reduce计算,这个汇聚的过程就是shuffle

第二个阶段(stage)则读入数据进行处理。

同一个stage里面的task是可以并发执行的,下一个stage要等前一个stage ready

(和mapreduce的reduce需要等map过程ready 一脉相承)

(为什么要写在本地:后面的RDD多个partition都要去读这个信息,如果放到内存,如果出现数据丢失,后面的所有步骤全部不能进行,违背了之前所说的需要parent RDD partition数据全部ready的原则。为什么要保证parent RDD要ready,如下例,如果有一个partition未生成或者在内存中丢失,那么直接导致计算结果是完全错误的:

Spark架构和工作原理、RDD依赖关系、DAG、stage详解

写到文件中更加可靠。Shuffle会生成大量临时文件,以免错误时重新计算,其使用的本地磁盘目录由spark.local.dir指定,缓存到磁盘的RDD数据。最好将这个属性设定为访问速度快的本地磁盘。可以配置多个路径到多个磁盘,增加IO带宽

在Spark 1.0 以后,SPARK_LOCAL_DIRS(Standalone, Mesos) or LOCAL_DIRS (YARN)参数会覆盖这个配置。比如Spark On YARN的时候,Spark Executor的本地路径依赖于Yarn的配置,而不取决于这个参数。)

对于transformation操作,以shuffle依赖为分隔,分为不同的Stages。

窄依赖------>tasks会归并在同一个stage中,(相同节点上的task运算可以像pipeline一样顺序执行,不同节点并行计算,互不影响)

shuffle依赖------>前后拆分为两个stage,前一个stage写完文件后下一个stage才能开始

action操作------>和其他tasks会归并在同一个stage(在没有shuffle依赖的情况下,生成默认的stage,保证至少一个stage)

6. DAG

6.1 什么是DAG

DAG,全称 Directed Acyclic Graph, 中文为:有向无环图。

在 Spark 中, 使用 DAG 来描述我们的计算逻辑。

DAG 是一组顶点和边的组合。顶点代表了 RDD, 边代表了对 RDD 的一系列操作。

DAG Scheduler 会根据 RDD 的 transformation 动作,将 DAG 分为不同的 stage,每个 stage 中分为多个 task,这些 task 可以并行运行。

Spark架构和工作原理、RDD依赖关系、DAG、stage详解

6.2 DAG 解决了什么问题

DAG 的出现主要是为了解决 Hadoop MapReduce 框架的局限性。那么 MapReduce 有什么局限性呢?

主要有两个:

每个 MapReduce 操作都是相互独立的,HADOOP不知道接下来会有哪些Map Reduce。
每一步的输出结果,都会持久化到硬盘或者 HDFS 上。
当以上两个特点结合之后,我们就可以想象,如果在某些迭代的场景下,MapReduce 框架会对硬盘和 HDFS 的读写造成大量浪费。

而且每一步都是堵塞在上一步中,所以当我们处理复杂计算时,会需要很长时间,但是数据量却不大。

所以 Spark 中引入了 DAG,它可以优化计算计划,比如减少 shuffle 数据。

6.3 DAG 是怎么工作的

工作流程:

1、解释器是第一层。Spark 通过使用Scala解释器,来解释代码,并会对代码做一些修改。
2、在Spark控制台中输入代码时,Spark会创建一个 operator graph, 来记录各个操作。
3、当一个 RDD 的 Action 动作被调用时, Spark 就会把这个 operator graph 提交到 DAG scheduler 上。
4、DAG Scheduler 会把 operator graph 分为各个 stage。 一个 stage 包含基于输入数据分区的task。DAG scheduler 会把各个操作连接在一起。
5、这些 Stage 将传递给 Task Scheduler。Task Scheduler 通过 cluster manager 启动任务。Stage 任务的依赖关系, task scheduler 是不知道的。
6、在 slave 机器上的 Worker 们执行 task。

Spark架构和工作原理、RDD依赖关系、DAG、stage详解

参考文章(侵删):
1、https://www.cnblogs.com/Mayny/p/9330436.html
2、https://my.oschina.net/u/3703858/blog/2251353
3、https://waltyou.github.io/Spark-DAG/

相关文章:

  • 2021-07-14
  • 2021-07-27
  • 2022-01-12
  • 2021-08-24
  • 2021-07-01
  • 2021-08-16
  • 2021-08-12
  • 2021-04-07
猜你喜欢
  • 2021-10-10
  • 2021-12-26
  • 2021-12-30
  • 2021-04-05
  • 2021-12-05
  • 2021-07-08
  • 2021-08-05
相关资源
相似解决方案