RDD概念:Resilient Distributed Dataset 弹性(优先放在内存,内存不够,磁盘)的分布式(RDD分布在多个节点上)数据集,Spark核心的数据抽象,本质是一个只读的分区记录集合

特点

  • 一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
  • 一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
  • RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算
  • 一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量
  • 一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

RDD分片数由什么决定

  • 默认值为2
  • 用户指定
  • hdfs block的个数即分片数量
  • kafka topic partition 数量
  • 如果要提高并行度,需要 repartition

RDD生成方式

  • 并行化方式,采用内部集合
  • 用外部存储系统

RDD算子

  • transformation:延迟加载,常用算子如下图
    Spark学习总结---RDD
  • action:触发job运行,一个action算子代表一个job,常用算子如下图
    Spark学习总结---RDD

RDD持久化:当RDD被多次使用的时候。就要考虑对其持久化

  • RDD持久化方法:cache, persist ,cache内部调用persist方法,持久化级别memory_only,如果要从内存中清除缓存,调用unpersist方法
  • 持久化策略,一般来讲,能不使用磁盘就不用。有的时候,从磁盘读取数据,还不如重新计算一次。因为优化的目的就是为了提高速度,有可能持久化到磁盘,再读取数据,这个耗时,可能时间比重新算一遍的时间更长。
    持久化级别:
    MEMORY_ONLY 以非序列化的Java对象的方式持久化在JVM内存中。如果内存无法完全存储RDD所有的partition,那么那些没有持久化的partition就会在下一次需要使用它的时候,重新被计算。
    MEMORY_AND_DISK 同上,但是当某些partition无法存储在内存中时,会持久化到磁盘中。下次需要使用这些partition时,需要从磁盘上读取。
    MEMORY_ONLY_SER 同MEMORY_ONLY,但是会使用Java序列化方式,将Java对象序列化后进行持久化。可以减少内存开销,但是需要进行反序列化,因此会加大CPU开销。
    MEMORY_AND_DSK_SER 同MEMORY_AND_DSK。但是使用序列化方式持久化Java对象。
    DISK_ONLY 使用非序列化Java对象的方式持久化,完全存储到磁盘上。
    MEMORY_ONLY_2
    MEMORY_AND_DISK_2
    等等 如果是尾部加了2的持久化级别,表示会将持久化数据复用一份,保存到其他节点,从而在数据丢失时,不需要再次计算,只需要使用备份数据即可。

checkpoint容错机制

应用场景:如果Spark应用程序,特别的复杂,从初始的RDD开始,到最后整个应用程序完成,有非常多的步骤,比如超过20个transformation操作。而且,整个应用运行的时间也特别长,比如通常要运行1~5个小时。在上述情况下,就比较适合使用checkpoint功能。因为,对于特别复杂的Spark应用,有很高的风险,会出现某个要反复使用的RDD,因为节点的故障,虽然之前持久化过,但是还是导致数据丢失了。那么也就是说,出现失败的时候,没有容错机制,所以当后面的transformation操作,又要使用到该RDD时,就会发现数据丢失了(CacheManager),此时如果没有进行容错处理的话,那么可能就又要重新计算一次数据,重新算一遍就是用的lineAge,但是这样可能非常的耗时。
checkpoint运行机制:首先调用SparkContext的setCheckpointDir()方法,设置一个容错的文件系统的目录,比如说HDFS;然后,对RDD调用checkpoint()方法。之后,在RDD所处的job运行结束之后,会启动一个单独的job,来将checkpoint过的RDD的数据写入之前设置的文件系统,进行高可用、容错的类持久化操作。
checkPoint的与lineAge的关系:进行checkPoint之后,这个rdd的依赖关系就被remove掉了,当数据丢失,需要恢复的时候,底层是首先判断这个RDD有没有进行过checkPoint,如果没有checkPoint,那么就通过lineAage向上追溯,对把数据进行恢复。
CheckPoint 需要有action算子,启动job才能执行,也算是一个transformation算子。
进行某个rdd进行checkPoint的到时候,会单独给这个rdd启动一个计算任务,写入磁盘,进行checkPoint 之前最好做一个持久化操作 。

共享变量

累加器:Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能。但是确给我们提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值
广播变量(只读):当在Executor端用到了Driver变量,不使用广播变量,在每个Executor中有多少个task就有多少个Driver端变量副本。如果使用广播变量在每个Executor端中只有一份Driver端的变量副本。
广播变量的读取:首先读取端会尝试从本地BlockManager直接读取未切分的完整数据;如果不存在会尝试从本地BlockManager读取切分的数据块;如果都不存在,则从远端的driver或executor拉取,拉取每个数据块时,都会随机选择一个持有该数据块的executor或driver进行拉取,这样可以减少各个节点的网络IO压力

相关文章: