文章目录
Spark简介
Spark简介
- 什么是Spark?
- Spark是基于内存计算的通用大规模数据处理框架
- Spark已经融入了Hadoop生态系统,可支持的作业类型和应用场景比MapReduce更为广泛,并且具备了MapReduce所有的高容错性和高伸缩性特点。
为何会诞生Spark?
- 回顾MapReduce
- 并不是所有的问题都可以简单的分解成Map和Reduce两步模型处理
- 并不是所有的问题都可以简单的分解成Map和Reduce两步模型处理
- MapReduce缺点
- 延时高 ✗
- Example:不适合交互式SQL分析
- 迭代计算力不从心 ✗
- Example:斐波那契数列
- 流式数据处理 ✗
- Example:统计网站PV、UV数据
- 延时高 ✗
- Spark
- 一站式解决
- 离线批处理 ✓
- 流式计算 ✓
- 在线实时分析 ✓
- 一站式解决
Spark为何快?
MapReduce
- MapReduce会将中间结果输出到本地磁盘
- 例如Shuffle时Map输出的中间结果
- 例如Shuffle时Map输出的中间结果
- 有多个MapReduce任务串联时,依赖HDFS存储中间结果的输出
- 例如执行Hive查询
- 例如执行Hive查询
- MapReduce在处理复杂DAG时会带来大量的数据copy、序列化和磁盘I/O开销
Spark
- Spark尽可能减少中间结果写入磁盘
- 尽可能减少不必要的Sort/Shuffle
- 反复用到的数据进行Cache
- 对于DAG进行高度优化
- 划分不同的Stage
- 使用延迟计算技术
Spark特点
- 内存计算
- 支持复杂查询、流式计算、机器学习、图计算
- 融入Hadoop生态圈
- 兼容HDFS
- 兼容Yarn
- 核心代码由Scala编写
- 发展速度快
- 社区活跃
- 最新版本2.4.0 (截止2018年2月)
Spark多语言支持
Spark体系结构和源代码解析
弹性分布式数据集RDD
- Spark将数据缓存在分布式内存中
- 如何实现?RDD
- Spark的核心
- 分布式内存抽象
- 提供了一个高度受限的共享内存模型
- 逻辑上集中但是物理上是存储在集群的多台机器上
RDD 属性和特点
- 只读
- 通过HDFS或者其它持久化系统创建RDD
- 通过transformation将父RDD转化得到新的RDD
- RDD上保存着前后之间依赖关系
- Partition
- 基本组成单位,RDD在逻辑上按照Partition分块
- 分布在各个节点上
- 分片数量决定并行计算的粒度
- RDD中保存如何计算每一个分区的函数
- 容错
- 失败自动重建
- 如果发生部分分区数据丢失,可以通过依赖关系重新计算
RDD.scala 解析
RDD.scala是所有RDD的总得抽象
RDD Example
val lines = sc.textFile(…)
lines.filter(x => x.contains(“Error”)).count()
宽依赖和窄依赖
- 窄依赖
- 没有数据shuffling
- 所有父RDD中的Partition均会和子RDD的Partition关系是一对一
宽依赖和窄依赖
- 宽依赖
- 有数据shuffling
- 所有父RDD中的Partition会被切分,根据key的不同划分到子RDD的Partition中
Stage
- 什么是Stage
- 一个Job会被拆分为多组Task,每组Task被称为一个Stage
- 划分依据
- 以shuffle操作作为边界,遇到一个宽依赖就分一个stage
- 以shuffle操作作为边界,遇到一个宽依赖就分一个stage
Stage执行优化
- 对窄依赖可以进行流水线(pipeline)优化
- 不互相依赖的Stage可以并行执行
- 存在依赖的Stage必须在依赖的Stage执行完之后才能执行
- Stage并行执行程度取决于资源数
Spark执行流程
- 用户创建Spark程序并提交
- 每个Action会生成一个Job
- 包含了一系列RDD以及如何对其进行转换transformation
- 对每个Job生成DAG
- Directed Acyclic Graph
- 对根据宽窄依赖对DAG进行划分Stage
- 对每一个Stage生成一组Task
- 一个Partition对应一个Task
- Spark会以一组Task为单位进行执行计算
Spark执行流程
Yarn资源调度过程
Spark on Yarn
- Yarn
- ResourceManager:负责整个集群资源管理和分配
- ApplicationMaster:Yarn中每个Application对应一个AM,负责与
ResrouceManager协商获取资源,并告知NodeManager分配启动Container - NodeManager:每个节点的资源和任务管理器,负责启动Container,并监视资源使用情况
- Container:资源抽象
- Spark
- Application:用户自己编写的Spark程序
- Driver:运行Application的main函数并创建SparkContext,和ClusterManager通信申请资源,任务分配并监控运行情况
- ClusterManager:指的是Yarn
- DAGScheduler:对DAG图划分Stage
- TaskScheduler:把TaskSet分配给具体的Executor
- Spark支持三种运行模式
- standalon, yarn-cluster, yarn-client
- standalon, yarn-cluster, yarn-client
Spark内存模型
Yarn资源调度过程
Spark内存结构
Spark内存优化方案
- Executor最大任务并行度
- TP = N/C
- 其中N=spark.executor.cores, C=spark.task.cpus
- 任务以Thread方式执行
- 活跃线程可使用内存范围(1/2n, 1/n) why?
- 出现Executor OOM错误(错误代码137,143等)
- 原因:Executor Memory达到上限
- 解决办法:
- 增加每个Task内存使用量
- 增大最大Heap值
- 降低spark.executor.cores数量
- 或者降低单个Task内存消耗量
- 每个partition对应一个任务
- 非SQL类应用 spark.default.parallism
- SQL类应用 spark.sql.shuffle.partition
- 增加每个Task内存使用量