2.1版本
- 是什么?
Apache Spark是Apache的一个顶级项目,是一个基于内存的分布式的快速、通用的大规模数据处理引擎。Spark是一站式解决方案,集批处理、实时流计算、交互式查询、图计算与机器学习与一体。
Spark是美国加州大学伯克利分校的AMP实验室研发的,从09年开始敲代码,12年发行0.6.0版本,然后是13年加入apache并且只用了8个月时间成为Apache的顶级项目之一。
用Spark的最大好处是它很快,在逻辑回归(机器学习,迭代计算)的场景下它可以比Hadoop快100倍,有官方图片为证
即使不在特定场景下,Spark的速度也会比Hadoop MapReduce的速度快好几倍,不过因为它是基于内存的分布式计算框架,所以比较的耗内存;迭代的次数越多,操作的数据越大,收益就越大;速度快的原因: 基于内存计算,DAG...
注:Spark基于内存并不是指全部数据都在内存只是中间迭代的结果在内存,cpu进行计算的时候是从内存中读取
- 干什么的?
Spark可以通过基于内存的分布式计算处理海量数据;Spark还支持迭代计算,有效的应对多步数据处理逻辑;Spark支持在海量数据的基础上进行复杂的计算和分析,支持各种挖掘数据和机器学习算法。
- Spark和Hadoop的对比:
首先spark和Hadoop的MR都是分布式的计算框架,Saprk是基于内存,MR是基于HDFS。Spark处理数据的能力一般是MR的十倍以上,Spark中除了基于内存计算外,还有DAG(有向无环图)来切分任务执行先后顺序。
- 怎么用
Spark生态系统
运行模式
Local
多用于本地测试,如在eclipse,idea中写程序测试等。
Standalone
Standalone是Spark自带的一个资源调度框架,它支持完全分布式。
Yarn
Hadoop生态圈里面的一个资源调度框架,Spark也是可以基于Yarn来计算的。
Mesos
资源调度框架。
要基于Yarn来进行资源调度,必须实现AppalicationMaster接口,Spark实现了这个接口,所以可以基于Yarn。
核心RDD
RDD是一个抽象类
怎么用:
- sc.textFile读取文件,返回RDD,读取出来的数据块就是patition(也就是说有多少个block块就有多少个patition)
- 使用某个方法(比如flatMap)作用在RDD上边变成一个新的RDD2,然后再Map(....),变成RDD3
五大特性:
1.Rdd由一系列的partition组成
2.函数是作用在partition上面的
3.RDD之间有一系列的依赖关系
4.分区器作用在K,V格式的RDD上面
5.RDD提供一系列的最佳计算位置,体现了大数据场景下,“计算移动,数据不移动”的理念
哪里体现了RDD的弹性?
patition可以随着block块的数量动态改变,所以个数没有吸纳之,我们可以调整
哪里体现了RDD的容错?
RDD之间有依赖关系,可以由父RDD产生子RDD
哪里体现了分布式?
RDD有partition组成,partition分布在不同的节点上
其他的几个小点:
RDD没有存数据的
TextFile底层就是封装的MR读取HDFS文件的方式
JOB和JOB之间串行运行,JOB内并行
运行框架
以上图中有四个机器节点,Driver和Worker是启动在节点上的进程,运行在JVM中的进程。
Driver与集群节点之间有频繁的通信。Driver负责任务(tasks)的分发和结果的回收。任务的调度。如果task的计算结果非常大就不要回收了。会造成oom。
Worker是Standalone资源调度框架里面资源管理的从节点。也是JVM进程。
Master是Standalone资源调度框架里面资源管理的主节点。也是JVM进程。
算子
转换算子
map/mapToPair
将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。
特点:输入一条,输出一条数据。
flatmap/flatMapToPair
先map后flat。与map类似,每个输入项可以映射为0到多个输出项。
reduceBykey
将相同的Key根据相应的逻辑进行处理。
Filter
过滤符合条件的记录数,true保留,false过滤掉。
sortBy/sortBykey
作用在K,V格式的RDD上,对key进行升序或者降序排序。
Sample
随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。
join/leftouterjoin/rightouterjoin/fullouterjoin
作用在K,V格式的RDD上。根据K进行连接,对(K,V)join(K,W)返回(K,(V,W))
join后的分区数与父RDD分区数多的那一个相同。将2个RDD和成一个RDD
union
合并两个数据集。两个数据集的类型要一致。返回新的RDD的分区数是合并RDD分 区数的总和。
Intersection
取两个数据集的交集
Subtract
取两个数据集的差集
mapPartions
与map类似,遍历的单位是每个partition上的数据(很有用)
distinct
去重(过程map+reduceByKey+map)
cogroup
当调用类型(K,V)和(K,W)的数据上时,返回一个数据集(K,(Iterable<V>,Iterable<W>))
mappartitionsWithIndex
在mappartitons的基础上带上索引值
repartition
增加或减少分区(会产生shuffle,源代码规定就要用shuffle)
coalesce
增加分区和减少分区,增加分区的时候必须要把shuffle的value设置为true。减少分区的时候,或者默认情况下是不产生shuffle的也就是false。这个是根据依赖关系来的。增加分区的时候是宽依赖,如果不设置为true的话,不会增加分区,只会保持原来的分区。
groupByKey
作用在K,V格式的RDD上。根据Key进行分组。作用在(K,V),返回(K,Iterable <V>)。
zip
将两个RDD中的元素(KV格式/非KV格式)变成一个KV格式的RDD,两个RDD的个数必须相同。
zipWithIndex
该函数将RDD中的元素和这个元素在RDD中的索引号(从0开始)组合成(K,V)对。
触发算子
Foreach
循环遍历数据集中的每个元素,运行相应的逻辑。
Take
返回一个包含数据集前n个元素的集合。
Count
返回数据集中的元素数。会在结果计算完成后回收到Driver端。
First
first=take(1),返回数据集中的第一个元素。
Collect
将计算结果回收到Driver端。
foreachPartition
遍历的数据是每个partition的数据。
countByKey
作用到K,V格式的RDD上,根据Key计数相同Key的数据集元素。
countByValue
根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。
reduce
根据聚合逻辑聚合数据集中的每个元素。
持久化算子
cache(数据放内存)做性能优化的
cache = persist() = memory_only
persisit(数据也是存放在内存中)也是做性能优化的
memory_only
disk_only
memory_and_disk
memory_only_ser
memory_and_disk_ser
_2
checkpoint(存在磁盘上checkpoint做容错的)
执行流程
1.当job执行完毕之后,spark会从finalRDD往前追溯,遇到checkpoint的RDD,先标记一下
2.spark框架会启动一个job,重新计算这个RDD的数据
3.一般将数据持久化到hdfs上.
checkpoint会切断RDD之间的依赖关系
优化:在需要checkpoint的RDD上面,先进行cache或persist
几个小点:
persist序列化是占用cpu的,所以用不用序列化要看有没有ser,有ser的救不能序列化
cach和persist是用来做性能提升的,如果重复用算子的话就要用(RDD不存东西),如果有多个重用的RDD,只对最后重用的RDD进行持久化。他不能容错,所以就有了checkpoint持久化到磁盘中