Spark RDD是Spark 核心基石

Transformation 操作

RDD的所有转换操作都是lazy模式,即Spark不会立刻计算结果,而只是简单的记住所有对数据集的转换操作。这些转换只有遇到action操作的时候才会开始计算。这样的设计使得Spark更加的高效,例如,对一个输入数据做一次map操作后进行reduce操作,只有reduce的结果返回给driver,而不是把数据量更大的map操作后的数据集传递给driver

Spark RDD 分析总结(二)

惰性求值(lazy模式

RDD 的转化操作都是惰性求值的。这意味着在被调用行动操作之前Spark 不会开始计算

读取数据到RDD的操作也是惰性的

惰性求值的好处

Spark 使用惰性求值可以把一些操作合并到一起来减少计算数据的步骤。在类似 Hadoop

MapReduce 的系统中,开发者常常花费大量时间考虑如何把操作组合到一起,以减少

MapReduce 的周期数。

而在Spark   中,写出一个非常复杂的映射并不见得能比使用很多简单的连续操作获得好很多的性能。因此,用户可以用更小的操作来组织他们的程序,这样也使这些操作更容易管理。

转换操作

RDD 的转化操作是返回新RDD 的操作

我们不应该把RDD  看作存放着特定数据的数据集,而最好把每个RDD  当作我们通过转化操作构建出来的、记录如何计算数据的指令列表。

基本转换操作例子

对一个数据{1,2,3}RDD进行基本RDD转换操作

基本转换操作1

 Spark RDD 分析总结(二)

对一个数据{1,2,3}{3,4,5}RDD进行两个RDD转换操作

 Spark RDD 分析总结(二)

控制操作

● persist操作,可以将RDD持久化到不同层次的存储介质,以便后续操作重复使用。1)cache:RDD[T]

2)persist:RDD[T] 3)Persist(level:StorageLevel):RDD[T]

● checkpoint

RDD持久化到HDFS中,与persist操作不同的是checkpoint会切断此RDD之前的依赖关   系,而persist依然保留RDD的依赖关系。

Action操作

 Spark RDD 分析总结(二)

Spark RDD 分析总结(二)

Pair RDD

● 包含键值对类型的RDD被称作Pair RDD

● Pair RDD通常用来进行聚合计算

Pair RDD通常由普通RDDETL转换而

创建Pair RDD

 Spark RDD 分析总结(二)

Pair RDDtransformation操作

Pair RDD 可以使用所有标准RDD 上转化操作,还提供了特有的转换操作

Pair RDD的转换操作(以键值对集合{(1,2),(3,4),(3,6)})

 Spark RDD 分析总结(二)

针对连个Pair RDD的操作(rdd={(1,2),(3,4),(3,6)}other={(3,9)})

 Spark RDD 分析总结(二)

Pair RDDaction操作

所有基础RDD 支持的行动操作也都在pair RDD 上可用

Pair RDD的行动操作(以{(1,2),(3,4),(5,6)})

 Spark RDD 分析总结(二)

Pair RDD的分区控制

Spark 中所有的键值对RDD 都可以进行分区控制---自定义分区

自定义分区的好处

1避免数据倾斜

 2控制task并行度

 Spark RDD 分析总结(二)

RDD持久性(缓存操作)

1使用 persist cache 方法将任意 RDD 缓存到内存、磁盘文件系统中

2缓存是容错的可以通过构建它的 transformation自劢重构。

3被缓存的 RDD 被使用的时,存取速度会被大大加速

4persist可以指定一个StorageLevel

 Spark RDD 分析总结(二)

Spark中最重要的功能之一是在整个操作中持续(或缓存)内存中的数据集。当持久化RDD时,每个节点都存储它在内存中计算的所有分区,并在该数据集上的其他操作(或从中派生的数据集)中重用它们。这可以使未来的actions 更快(通常超过10倍)。缓存是迭代算法和快速交互式使用的关键工具。

您可以将RDD标记为使用其上的persist()cache()方法持久化。第一次在Action中计算时,它将保存在节点的内存中。Spark的缓存是容错的 - 如果RDD的任何分区丢失,它将自动使用最初创建它的转换重新计算。

此外,每个持久RDD可以使用不同的存储级别进行存储,例如,允许您将数据集保存在磁盘上,将其保存在内存中,但作为序列化的Java对象(以节省空间),将其复制到节点上。这些级别通过传递一个 StorageLevel对象(Scala Java Python)来设置persist()。该cache()方法是使用默认存储级别的简写,它是StorageLevel.MEMORY_ONLY(将反序列化对象存储在内存中)。

全套存储级别

 Spark RDD 分析总结(二)

val NONE = new StorageLevel(false, false, falsefalse)

val DISK_ONLY = new StorageLevel(true, false, false, false)

val DISK_ONLY_2 = new StorageLevel(true, false, false, false,  2)

val MEMORY_ONLY new StorageLevel(falsetruefalsetrue)

val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)

val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)

val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)

val MEMORY_AND_DISK = new StorageLevel(true, true, falsetrue)

val MEMORY_AND_DISK_2 = new StorageLevel(true, true, falsetrue, 2)

val MEMORY_AND_DISK_SER = new StorageLevel(true, true, falsefalse)

val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)

val OFF_HEAP = new StorageLevel(false, false, true, false)

MEMORY_ONLY 

RDD作为反序列化的Java对象存储在JVM中。如果RDD不适合内存,则某些分区将不会被缓存,并会在每次需要时重新计算。这是默认级别。

MEMORY_AND_DISK

RDD作为反序列化的Java对象存储在JVM中。如果RDD不适合内存,请存储不适合磁盘的分区,并在需要时从中读取它们。

MEMORY_ONLY_SER 
JavaScala

RDD存储为序列化的 Java对象(每个分区一个字节的数组)。与反序列化的对象相比,这通常更节省空间,特别是在使用 快速序列化器时,但需要更多的CPU密集型读取。

MEMORY_AND_DISK_SER 
JavaScala

MEMORY_ONLY_SER类似,但将不适合内存的分区溢出到磁盘上,而不是每次需要时重新计算它们。

DISK_ONLY

RDD分区仅存储在磁盘上。

MEMORY_ONLY_2MEMORY_AND_DISK_2

与上面的级别相同,但复制两个群集节点上的每个分区。

OFF_HEAP(实验)

MEMORY_ONLY_SER类似,但将数据存储在 堆内存储器中。这需要启用堆堆内存。

注意:

 Python中,存储的对象将始终与Pickle库串行化,所以选择序列化级别无关紧要。Python中的可用存储级别包括MEMORY_ONLYMEMORY_ONLY_2 MEMORY_AND_DISKMEMORY_AND_DISK_2DISK_ONLY,和DISK_ONLY_2

reduceByKey即使用户没有持久化Spark也会在洗牌操作中自动保存一些中间数据(例如)persist。这是为了避免在洗牌过程中节点失败时重新计算整个输入。我们仍建议用户调用persist生成的RDD,如果他们打算重用它。

RDD底层实现原理

RDD是一个分布式数据集,顾名思义,其数据应该分部存储于多台机器上。事实上,每个RDD的数据都以Block的形式存储于多台机器上,下图是SparkRDD存储架构图,其中每个Executor会启动一个BlockManagerSlave,并管理一部分Block;而Block的元数据由Driver节点的BlockManagerMaster保存。BlockManagerSlave生成Block后向BlockManagerMaster注册该BlockBlockManagerMaster管理RDDBlock的关系,当RDD不再需要存储的时候,将向BlockManagerSlave发送指令删除相应的Block

Spark RDD 分析总结(二)

RDD cache的原理

RDD的转换过程中,并不是每个RDD都会存储,如果某个RDD会被重复使用,或者计算其代价很高,那么可以通过显示调用RDD提供的cache()方法,把该RDD存储下来。那RDDcache是如何实现的呢?

RDD中提供的cache()方法只是简单的把该RDD放到cache列表中。当RDDiterator被调用时,通过CacheManagerRDD计算出来,并存储到BlockManager中,下次获取该RDD的数据时便可直接通过CacheManagerBlockManager读出。

相关文章: