一:避免创建重复的RDD

通常来说,我们在开发一个Spark作业时,首先是基于某个数据源(比如Hive表或HDFS文件)创建一个初始的RDD;

接着对这个RDD执行某个算子操作,然后得到下一个RDD;

以此类推,循环往复,直到计算出最终我们需要的结果。

在这个过程中,多个RDD会通过不同的算子操作(比如map、reduce等)串起来,这个“RDD串”,就是RDD lineage,也就是“RDD的血缘关系链”。

我们在开发过程中要注意:对于同一份数据,只应该创建一个RDD,不能创建多个RDD来代表同一份数据。

一些Spark初学者在刚开始开发Spark作业时,或者是有经验的工程师在开发RDD lineage极其冗长的Spark作业时,可能会忘了自己之前对于某一份数据已经创建过一个RDD了,从而导致对于同一份数据,创建了多个RDD。这就意味着,我们的Spark作业会进行多次重复计算来创建多个代表相同数据的RDD,进而增加了作业的性能开销。

二:尽可能复用同一个RDD

除了要避免在开发过程中对一份完全相同的数据创建多个RDD之外,在对不同的数据执行算子操作时还要尽可能地复用一个RDD。

比如说,有一个RDD的数据格式是key-value类型的,另一个是单value类型的,这两个RDD的value数据是完全一样的。那么此时我们可以只使用key-value类型的那个RDD,因为其中已经包含了另一个的数据。对于类似这种多个RDD的数据有重叠或者包含的情况,我们应该尽量复用一个RDD,这样可以尽可能地减少RDD的数量,从而尽可能减少算子执行的次数。

(一)错误用例

// 有一个<Long, String>格式的RDD,即rdd1。
// 接着由于业务需要,对rdd1执行了一个map操作,创建了一个rdd2,而rdd2中的数据仅仅是rdd1中的value值而已,也就是说,rdd2是rdd1的子集。
JavaPairRDD<Long, String> rdd1 = ...
JavaRDD<String> rdd2 = rdd1.map(...)

// 分别对rdd1和rdd2执行了不同的算子操作。
rdd1.reduceByKey(...)
rdd2.map(...)

(二)改进方法

JavaPairRDD<Long, String> rdd1 = ...
rdd1.reduceByKey(...)
rdd1.map(tuple._2...)

第二种方式相较于第一种方式而言,很明显减少了一次rdd2的计算开销。
但是到这里为止,优化还没有结束,对rdd1我们还是执行了两次算子操作,rdd1实际上还是会被计算两次。
因此还需要配合“原则三:对多次使用的RDD进行持久化”进行使用,才能保证一个RDD被多次使用时只被计算一次。

三:对多次使用的RDD进行持久化

(一)持久化介绍

当你在Spark代码中多次对一个RDD做了算子操作后,恭喜,你已经实现Spark作业第一步的优化了,也就是尽可能复用RDD。此时就该在这个基础之上,进行第二步优化了,也就是要保证对一个RDD执行多次算子操作时,这个RDD本身仅仅被计算一次。

Spark中对于一个RDD执行多次算子的默认原理是这样的:每次你对一个RDD执行一个算子操作时,都会重新从源头处计算一遍,计算出那个RDD来,然后再对这个RDD执行你的算子操作。这种方式的性能是很差的。

因此对于这种情况,我们的建议是:对多次使用的RDD进行持久化。此时Spark就会根据你的持久化策略,将RDD中的数据保存到内存或者磁盘中。以后每次对这个RDD进行算子操作时,都会直接从内存或磁盘中提取持久化的RDD数据,然后执行算子,而不会从源头处重新计算一遍这个RDD,再执行算子操作。

(二)持久化方法

如果要对一个RDD进行持久化,只要对这个RDD调用cache()和persist()即可。

cache()方法表示:使用非序列化的方式将RDD中的数据全部尝试持久化到内存中。

persist()方法表示:手动选择持久化级别,并使用指定的方式进行持久化。

第二次执行reduce算子时,就会直接从内存中提取数据进行计算,不会重复计算一个rdd。

val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()
rdd1.map(...)
rdd1.reduce(...)
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)

(三)persist持久化级别

对于persist()方法而言,我们可以根据不同的业务场景选择不同的持久化级别。

持久化级别 含义解释
MEMORY_ONLY 使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略
MEMORY_AND_DISK 使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用。
MEMORY_ONLY_SER 基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
MEMORY_AND_DISK_SER 基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
DISK_ONLY 使用未序列化的Java对象格式,将数据全部写入磁盘文件中。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等. 对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。

(四)持久化策略选取

默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。
1.性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大

相关文章:

  • 2021-09-27
  • 2021-05-21
  • 2021-12-12
  • 2021-05-10
  • 2021-10-31
  • 2021-09-25
  • 2021-05-21
猜你喜欢
  • 2021-06-21
  • 2021-12-29
  • 2021-12-24
  • 2021-07-28
  • 2022-02-14
  • 2022-02-26
  • 2021-11-30
相关资源
相似解决方案