- 资源调优
- 就是多分配内存和
core
- 就是多分配内存和
- 更改高效的序列化方法,
kyro(减少内存开销) -
优化数据结构(减少内存开销)
- 优先使用数组,而不是集合类。优先使用字符串。尽可能少的使用包装类.
- 业务允许的情况下尽量使用
id作为唯一键,不用String类型 - 尽量少用对象嵌套结构,可以用
Json串来代替对象嵌套结构
-
对
RDD进行持久化与Checkpoint- 如果一个
RDD被多次进行Action操作和Transformation操作,那么我们为了提高性能就可以将这个RDD进行持久化。调用RDD的cache方法和persist方法来进行持久化 -
指定序列化的持久化级别
持久化级别 解释 MEMORY_ONLY 将没有序列化的 java对象持久化到内存中,spark的默认持久化级别,如果有的分区内存不够就不会在该分区上持久化MEMORY_AND_DISK 将没有序列化的 java对象持久化到内存中,当内存中不够用的时候,将一部分数据持久化到磁盘中MEMORY_ONLY_SER 将RDD 存储为序列化的Java对象(每个分区一个字节数组)。与反序列化对象相比,这通常更节省空间,特别是在使用快速序列化器时,但是读起来更需要cpu。MEMORY_AND_DISK_SER 与 MEMORY_ONLY_SER类似,但超出内存的数据溢出到磁盘中DISK_ONLY 只将 RDD分区存储在磁盘上。MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 以副本的方式持久化数据
- 如果一个
-
Java虚拟机的垃圾回收机制的调优- 如果在持久还
RDD的时候,持久化了大量的数据,那么Java虚拟机的垃圾回收就可能出现了瓶颈,因为Java垃圾回收机制是定期对垃圾对象进行回收,使得内存得以释放。频繁的垃圾回收会影响Spark作业的性能,因为Java的垃圾回收机制其实就是一条线程,在这个线程执行的时候,其他所有的线程都变为等待状态,垃圾回收线程执行完了以后其他线程才会继续执行,这样性能就很受影响 - 默认情况下
Executor内存空间被划分了两块。一块分到60%的空间,用来缓存RDD数据,另一块分为剩下的40%空间,用来分配Task,存放它运行时动态创建的对象。这种情况下,40%空间不足的时候,就会频繁进行垃圾回收,而且还有可能造成Spark作业的运行异常。在实际业务中可以根据具体情况来调整这缓存RDD区域内存的占比大小,调整方式为:new SparkConf().set("spark.storage,memoryFraction","0.5"),使得这两块区域各占50% Executor中的Task运行时的内存空间又被分为两块空间,一块叫做老年代,存放长时间存活的对象,一块叫做新生代,存放短时间存活的对象。新生代又被分为三块空间,第一块叫做Eden区域,第二块叫做Survivor1区域,第三块叫做survivor2区域。在动态创建对象的时候首先会将对象先放入Eden和survivor1区域中,survivor2是备用区域。当Eden区域满了以后就会触发minor gc操作,会回收新生代中不被使用的对象,剩下的对象就会被移入survivor2区域中,这时survivor2和survivor1就会互换角色,survivor1变成了备用区域。如果在新生代中多次触发垃圾回收以后,有的对象长时间不会被回收,那么就说明他是一个长时间存活的对象,就会被移到老年代中。如果Eden区域分的不够大,当发生minor gc的时候存活的对象会被移入备用区域,如果这个时候备用区域满了,短时间存活的对象就有可能被分到老年区域中,当老年区域满了的时候就出触发full gc操作来回收老年代的对象,在回收的时候其他线程停止,影响性能。-
提高Spark作业并行度
- 设置并行度为集群cpu的2~3倍的Task数量,
new SparkConf("spark.default.parallelism","10"),HDFS上每一个block就是一个Partition。
- 设置并行度为集群cpu的2~3倍的Task数量,
-
广播共享数据
- 为每个节点都广播一份共享数据。
val broadcastMyData = sc.broadcast("myData"),以后再次用到这个变量的时候就直接用广播以后的变量即可。
- 为每个节点都广播一份共享数据。
-
数据本地化
- 数据本地化就是说数据与计算它的代码是分开的,那么最后肯定他们俩需要在一起才能完成计算任务,这也样就出现了网络
IO,通常来说传输计算代码要比传输数据快的多,因为代码数据量小,数据的数据量大。Spark也是基于这种机制来进行Task的调度的。 - 数据本地化级别,
-
PROCESS_LOCAL:数据与计算代码在一个Jvm进程中 -
NODE_LOCAL:数据与计算代码在一个节点上,但是不在一个进程中,比如说不在一个Executor上 -
NO_PREF:数据从哪来性能都是一样的 -
RACK_LOCAL:数据和代码在一个机架上 -
ANY:数据可能在任意地方,比如网路可达的任意机器
-
- 在
TaskScheduleImpl提交Task到Executor上去执行的时候,Task在等待一段时间以后,如果Executor一直没有core被释放,那么Task的数据本地化级别就会依次放大一个级别,这个等待时间是可以调整的,这就是一个调优点
- 数据本地化就是说数据与计算它的代码是分开的,那么最后肯定他们俩需要在一起才能完成计算任务,这也样就出现了网络
-
shuffle性能优化-
spark.shuffle.file.buffer- 默认值:
32k - 参数说明:该参数用于设置
shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。 - 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如
64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
- 默认值:
-
spark.reducer.maxSizeInFlight- 默认值:48m
- 参数说明:该参数用于设置
shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。 - 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如
96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
-
spark.shuffle.io.maxRetries- 默认值:3
- 参数说明:
shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。 - 调优建议:对于那些包含了特别耗时的
shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。
-
spark.shuffle.io.retryWait- 默认值:
5s - 参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是
5s。 - 调优建议:建议加大间隔时长(比如
60s),以增加shuffle操作的稳定性。
- 默认值:
-
spark.shuffle.memoryFraction- 默认值:
0.2 - 参数说明:该参数代表了
Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。 - 调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给
shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。
- 默认值:
-
spark.shuffle.manager- 默认值:sort
- 参数说明:该参数用于设置
ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。 - 调优建议:由于
SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。
-
spark.shuffle.sort.bypassMergeThreshold- 默认值:200
- 参数说明:当
ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。 - 调优建议:当你使用
SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。
-
spark.shuffle.consolidateFiles- 默认值:false
- 参数说明:如果使用
HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。 - 调优建议:如果的确不需要
SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。
-
- 如果在持久还
相关文章: