Spark RDD是Spark 核心基石
Transformation 操作
RDD的所有转换操作都是lazy模式,即Spark不会立刻计算结果,而只是简单的记住所有对数据集的转换操作。这些转换只有遇到action操作的时候才会开始计算。这样的设计使得Spark更加的高效,例如,对一个输入数据做一次map操作后进行reduce操作,只有reduce的结果返回给driver,而不是把数据量更大的map操作后的数据集传递给driver。
惰性求值(lazy模式)
RDD 的转化操作都是惰性求值的。这意味着在被调用行动操作之前Spark 不会开始计算
读取数据到RDD的操作也是惰性的
惰性求值的好处
Spark 使用惰性求值可以把一些操作合并到一起来减少计算数据的步骤。在类似 Hadoop
MapReduce 的系统中,开发者常常花费大量时间考虑如何把操作组合到一起,以减少
MapReduce 的周期数。
而在Spark 中,写出一个非常复杂的映射并不见得能比使用很多简单的连续操作获得好很多的性能。因此,用户可以用更小的操作来组织他们的程序,这样也使这些操作更容易管理。
转换操作
RDD 的转化操作是返回新RDD 的操作
我们不应该把RDD 看作存放着特定数据的数据集,而最好把每个RDD 当作我们通过转化操作构建出来的、记录如何计算数据的指令列表。
基本转换操作例子
对一个数据{1,2,3}的RDD进行基本RDD转换操作
基本转换操作1
对一个数据{1,2,3}和{3,4,5}的RDD进行两个RDD转换操作
控制操作
● persist操作,可以将RDD持久化到不同层次的存储介质,以便后续操作重复使用。1)cache:RDD[T]
2)persist:RDD[T] 3)Persist(level:StorageLevel):RDD[T]
● checkpoint
将RDD持久化到HDFS中,与persist操作不同的是checkpoint会切断此RDD之前的依赖关 系,而persist依然保留RDD的依赖关系。
Action操作
Pair RDD
● 包含键值对类型的RDD被称作Pair RDD
● Pair RDD通常用来进行聚合计算
Pair RDD通常由普通RDD做ETL转换而来
创建Pair RDD
Pair RDD的transformation操作
Pair RDD 可以使用所有标准RDD 上转化操作,还提供了特有的转换操作
Pair RDD的转换操作(以键值对集合{(1,2),(3,4),(3,6)})
针对连个Pair RDD的操作(rdd={(1,2),(3,4),(3,6)}other={(3,9)})
Pair RDD的action操作
所有基础RDD 支持的行动操作也都在pair RDD 上可用
Pair RDD的行动操作(以{(1,2),(3,4),(5,6)})
Pair RDD的分区控制
Spark 中所有的键值对RDD 都可以进行分区控制---自定义分区
自定义分区的好处
1、避免数据倾斜
2、控制task并行度
RDD持久性(缓存操作)
1、使用 persist 和 cache 方法将任意 RDD 缓存到内存、磁盘文件系统中
2、缓存是容错的可以通过构建它的 transformation自劢重构。
3、被缓存的 RDD 被使用的时,存取速度会被大大加速
4、persist可以指定一个StorageLevel
Spark中最重要的功能之一是在整个操作中持续(或缓存)内存中的数据集。当持久化RDD时,每个节点都存储它在内存中计算的所有分区,并在该数据集上的其他操作(或从中派生的数据集)中重用它们。这可以使未来的actions 更快(通常超过10倍)。缓存是迭代算法和快速交互式使用的关键工具。
您可以将RDD标记为使用其上的persist()或cache()方法持久化。第一次在Action中计算时,它将保存在节点的内存中。Spark的缓存是容错的 - 如果RDD的任何分区丢失,它将自动使用最初创建它的转换重新计算。
此外,每个持久RDD可以使用不同的存储级别进行存储,例如,允许您将数据集保存在磁盘上,将其保存在内存中,但作为序列化的Java对象(以节省空间),将其复制到节点上。这些级别通过传递一个 StorageLevel对象(Scala, Java, Python)来设置persist()。该cache()方法是使用默认存储级别的简写,它是StorageLevel.MEMORY_ONLY(将反序列化对象存储在内存中)。
全套存储级别
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
MEMORY_ONLY
将RDD作为反序列化的Java对象存储在JVM中。如果RDD不适合内存,则某些分区将不会被缓存,并会在每次需要时重新计算。这是默认级别。
MEMORY_AND_DISK
将RDD作为反序列化的Java对象存储在JVM中。如果RDD不适合内存,请存储不适合磁盘的分区,并在需要时从中读取它们。
MEMORY_ONLY_SER
(Java和Scala)
将RDD存储为序列化的 Java对象(每个分区一个字节的数组)。与反序列化的对象相比,这通常更节省空间,特别是在使用 快速序列化器时,但需要更多的CPU密集型读取。
MEMORY_AND_DISK_SER
(Java和Scala)
与MEMORY_ONLY_SER类似,但将不适合内存的分区溢出到磁盘上,而不是每次需要时重新计算它们。
DISK_ONLY
将RDD分区仅存储在磁盘上。
MEMORY_ONLY_2,MEMORY_AND_DISK_2等
与上面的级别相同,但复制两个群集节点上的每个分区。
OFF_HEAP(实验)
与MEMORY_ONLY_SER类似,但将数据存储在 堆内存储器中。这需要启用堆堆内存。
注意:
在Python中,存储的对象将始终与Pickle库串行化,所以选择序列化级别无关紧要。Python中的可用存储级别包括MEMORY_ONLY,MEMORY_ONLY_2, MEMORY_AND_DISK,MEMORY_AND_DISK_2,DISK_ONLY,和DISK_ONLY_2。
reduceByKey即使用户没有持久化,Spark也会在洗牌操作中自动保存一些中间数据(例如)persist。这是为了避免在洗牌过程中节点失败时重新计算整个输入。我们仍建议用户调用persist生成的RDD,如果他们打算重用它。
RDD底层实现原理
RDD是一个分布式数据集,顾名思义,其数据应该分部存储于多台机器上。事实上,每个RDD的数据都以Block的形式存储于多台机器上,下图是Spark的RDD存储架构图,其中每个Executor会启动一个BlockManagerSlave,并管理一部分Block;而Block的元数据由Driver节点的BlockManagerMaster保存。BlockManagerSlave生成Block后向BlockManagerMaster注册该Block,BlockManagerMaster管理RDD与Block的关系,当RDD不再需要存储的时候,将向BlockManagerSlave发送指令删除相应的Block。
RDD cache的原理
RDD的转换过程中,并不是每个RDD都会存储,如果某个RDD会被重复使用,或者计算其代价很高,那么可以通过显示调用RDD提供的cache()方法,把该RDD存储下来。那RDD的cache是如何实现的呢?
RDD中提供的cache()方法只是简单的把该RDD放到cache列表中。当RDD的iterator被调用时,通过CacheManager把RDD计算出来,并存储到BlockManager中,下次获取该RDD的数据时便可直接通过CacheManager从BlockManager读出。