RDD容错机制
分布式系统通常在一个机器集群上运行,同时运行的几百台机器中某些出问题的概率大大增加,所以容错设计是分布式系统的一个重要能力。
Spark以前的集群容错处理模型,像MapReduce,将计算转换为一个有向无环图(DAG)的任务集合,这样可以通过重复执行DAG里的一部分任务来完成容错恢复。但是由于主要的数据存储在分布式文件系统中,没有提供其他存储的概念,容错过程需要在网络上进行数据复制,从而增加了大量的消耗。所以,分布式编程中经常需要做检查点,即将某个时机的中间数据写到存储(通常是分布式文件系统)中。
RDD也是一个DAG,每一个RDD都会记住创建该数据集需要哪些操作,跟踪记录RDD的继承关系,这个关系在Spark里面叫lineage(血缘关系)。当一个RDD的某个分区丢失时,RDD是有足够的信息记录其如何通过其他RDD进行计算,且只需重新计算该分区,这是Spark的一个创新。
RDD的缓存
概述
相比Hadoop MapReduce来说,Spark计算具有巨大的性能优势,其中很大一部分原因是Spark对于内存的充分利用,以及提供的缓存机制。
RDD持久化(缓存)
持久化在早期被称作缓存(cache),但缓存一般指将内容放在内存中。虽然持久化操作在绝大部分情况下都是将RDD缓存在内存中,但一般都会在内存不够时用磁盘顶上去(比操作系统默认的磁盘交换性能高很多)。当然,也可以选择不使用内存,而是仅仅保存到磁盘中。所以,现在Spark使用持久化(persistence)这一更广泛的名称。
如果一个RDD不止一次被用到,那么就可以持久化它,这样可以大幅提升程序的性能,甚至达10倍以上。
默认情况下,RDD只使用一次,用完即扔,再次使用时需要重新计算得到,而持久化操作避免了这里的重复计算,实际测试也显示持久化对性能提升明显,这也是Spark刚出现时被人称为内存计算框架的原因。
假设首先进行了RDD0→RDD1→RDD2的计算作业,那么计算结束时,RDD1就已经缓存在系统中了。在进行RDD0→RDD1→RDD3的计算作业时,由于RDD1已经缓存在系统中,因此RDD0→RDD1的转换不会重复进行,计算作业只须进行RDD1→RDD3的计算就可以了,因此计算速度可以得到很大提升。
持久化的方法是调用persist()函数,除了持久化至内存中,还可以在persist()中指定storage level参数使用其他的类型,具体如下:
1)MEMORY_ONLY : 将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中. 如果内存空间不够,部分数据分区将不会被缓存,在每次需要用到这些数据时重新进行计算. 这是默认的级别。
cache()方法对应的级别就是MEMORY_ONLY级别
2)MEMORY_AND_DISK:将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中。
如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。
3)MEMORY_ONLY_SER :将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。这种方式会比反序列化对象的方式节省很多空间,尤其是在使用 fast serialize时会节省更多的空间,但是在读取时会使得 CPU 的 read 变得更加密集。如果内存空间不够,部分数据分区将不会被缓存,在每次需要用到这些数据时重新进行计算。
4)MEMORY_AND_DISK_SER :类似于 MEMORY_ONLY_SER ,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。
5)DISK_ONLY:只在磁盘上缓存 RDD。
6)MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. :与上面的级别功能相同,
只不过每个分区在集群中两个节点上建立副本。
7)OFF_HEAP 将数据存储在 off-heap memory 中。使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。使用堆外内存的好处:可能会利用到更大的内存存储空间。但是对于数据的垃圾回收会有影响,需要程序员来处理
注意,可能带来一些GC回收问题。
Spark 也会自动持久化一些在 shuffle 操作过程中产生的临时数据(比如 reduceByKey),即便是用户并没有调用持久化的方法。这样做可以避免当 shuffle 阶段时如果一个节点挂掉了就得重新计算整个数据的问题。如果用户打算多次重复使用这些数据,我们仍然建议用户自己调用持久化方法对数据进行持久化。
使用缓存
scala> import org.apache.spark.storage._
scala> val rdd1=sc.makeRDD(1 to 5)
scala> rdd1.cache //cache只有一种默认的缓存级别,即MEMORY_ONLY
scala> rdd1.persist(StorageLevel.MEMORY_ONLY)
缓存数据的清除
Spark 会自动监控每个节点上的缓存数据,然后使用 least-recently-used (LRU) 机制来处理旧的缓存数据。如果你想手动清理这些缓存的 RDD 数据而不是去等待它们被自动清理掉,
可以使用 RDD.unpersist( ) 方法。
RDD缓存机制的作用:
当把某个分区的数据缓存之后,以后如果需要 重复用到此RDD数据,不需要重新计算,提高性能
从数据容错角度来看,当一个DAG的计算链很长时,比如有100个RDD,可以选择在RDD10,RDD20,RDD30.。。做一次缓存,可以避免某个分区数据丢失,倒是整个计算链重新计算。
Checkpoint机制
checkpoint的意思就是建立检查点,类似于快照,例如在spark计算里面 计算流程DAG特别长,服务器需要将整个DAG计算完成得出结果,但是如果在这很长的计算流程中突然中间算出的数据丢失了,spark又会根据RDD的依赖关系从头到尾计算一遍,这样子就很费性能,当然我们可以将中间的计算结果通过cache或者persist放到内存或者磁盘中,但是这样也不能保证数据完全不会丢失,存储的这个内存出问题了或者磁盘坏了,也会导致spark从头再根据RDD计算一遍,所以就有了checkpoint,其中checkpoint的作用就是将DAG中比较重要的中间数据做一个检查点将结果存储到一个高可用的地方
代码示例:
object Driver2 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local").setAppName("wordcount")
val sc=new SparkContext(conf)
sc.setCheckpointDir("hdfs://hadoop01:9000/check01")
val data=sc.textFile("d://data/word.txt")
data.cache()
data.checkpoint()
val wordcount=data.flatMap {_.split(" ")}.map {(_,1)}.reduceByKey(_+_)
wordcount.cache()
wordcount.checkpoint()
wordcount.foreach{println}
}
}
总结:Spark的CheckPoint机制很重要,也很常用,尤其在机器学习中的一些迭代算法中很常见。比如一个算法迭代10000次,如果不适用缓冲机制,如果某分区数据丢失,会导致整个计算链重新计算,所以引入缓存机制。但是光引入缓存,也不完全可靠,比如缓存丢失或缓存存储不下,也会导致重新计算,所以使用CheckPoint机制再做一层保证。
补充:检查目录的路径,一般都是设置到HDFS上
Spark共享变量
Spark程序的大部分操作都是RDD操作,通过传入函数给RDD操作函数来计算。这些函数在不同的节点上并发执行,但每个内部的变量有不同的作用域,不能相互访问,所以有时会不太方便,Spark提供了两类共享变量供编程使用——广播变量和计数器。
-
广播变量
这是一个只读对象,在所有节点上都有一份缓存,创建方法是SparkContext.broadcast(),比如:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
注意,广播变量是只读的,所以创建之后再更新它的值是没有意义的,一般用val修饰符来定义广播变量。 -
计数器
计数器只能增加,是共享变量,用于计数或求和。
计数器变量的创建方法是SparkContext.accumulator(v, name),其中v是初始值,name是名称。
示例如下:
scala> val accum = sc.accumulator(0, “My Accumulator”)
accum: org.apache.spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
scala> accum.value
res1: Int = 10
spark解决数据倾斜问题
文件1
id name
1 tom
2 rose
文件2
id school sno
1 s1 211
2 s2 222
3 s3 233
4 s2 244
期望得到的数据 :
1 tom s1
2 rose s2
将少量的数据转化为Map进行广播,广播会将此 Map 发送到每个节点中,如果不进行广播,每个task执行时都会去获取该Map数据,造成了性能浪费。
完整代码
import org.apache.spark.{SparkContext, SparkConf}
import scala.collection.mutable.ArrayBuffer
object joinTest extends App{
val conf = new SparkConf().setMaster(“local[2]”).setAppName(“test”)
val sc = new SparkContext(conf)
/**
- map-side-join
- 取出小表中出现的用户与大表关联后取出所需要的信息
- */
val people_info = sc.parallelize(Array((“1”,“tom”),(“2”,“rose”))).collectAsMap()
val student_all = sc.parallelize(Array((“1”,“s1”,“211”),
(“1”,“s2”,“222”),
(“1”,“s3”,“233”),
(“1”,“s2”,“244”)))
//将需要关联的小表进行广播
val people_bc = sc.broadcast(people_info)
/**
- 使用mapPartition而不是用map,减少创建broadCastMap.value的空间消耗
- 同时匹配不到的数据也不需要返回()
- */
val res = student_all.mapPartitions(iter =>{
//获取小表的数据
val stuMap = people_bc.value
val arrayBuffer = ArrayBuffer(String,String,String)
//做两表的操作
iter.foreach{case (idCard,school,sno) =>{
if(stuMap.contains(idCard)){
arrayBuffer.+= ((idCard, stuMap.getOrElse(idCard,""),school))
}
}}
arrayBuffer.iterator
})