1、程序编写准则

  • 准则一:从同一个数据源尽量只创建一个RDD,后续不同的业务逻辑可以复用该RDD,而不是基于该数据源重新创建一个新的RDD,这样Spark仅仅需要从HDFS上加载一次文件的内容就可以了。

  • 准则二:如果需要对某个RDD进行多次不同的Transformation和Action操作,可以考虑对该RDD进行持久化操作,以避免Action操作触发作业时多次重复计算该RDD。

    因为Spark程序是基于延迟执行和基于Lineage最大化的pipeline,因此当对某个RDD的Action操作触发了作业时,会基于Lineage从后往前推,找到该RDD的源头RDD,然后从前往后计算出结果,当对某个RDD执行了多次Transformation和Action操作,每次Action操作触发了作业时都会重新从源头RDD处计算一遍来获得该RDD,然后再对这个RDD执行相应的操作,这种方式的性能是很差的。当对多次使用的RDD进行持久化之后,Spark就会根据持久化策略,将RDD中的数据保存到内存或者磁盘中,以后每次对该RDD进行算子操作时,都会直接从内存或磁盘中提取持久化的RDD数据,然后执行算子,而不会从源头重新计算一遍这个RDD再执行算子操作。

  • 准则三:从数据源读取数据获得RDD后,要尽早进行filter过滤掉不需要的数据。

  • 准则四:尽量避免使用Shuffle类算子,且在必须Shuffle时尽量减少Shuffle的数据量。

    如果有可能,要尽量避免使用Shuffle类算子,因为Spark作业运行过程中,最消耗性能的地方就是Shuffle过程。Shuffle过程会将分布在集群中多个节点上的包含同一个key的数据拉取到同一个节点上,然后进行聚合或join等操作,这样可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中,因此在Shuffle过程中,可能会发生大量的磁盘文件读写操作,以及数据的网络传输操作,这样会降低程序的执行速度。因此,应当尽可能避免使用reduceByKey、join、distinct和repartition等会进行Shuffle的算子,而应尽量使用Map类的非Shuffle算子。如果因为业务需要,一定要使用Shuffle操作且无法使用Map类算子来代替时,可以通过map-side预聚合的算子来减少Shuffle的数据量。map-side预聚合在每个节点本地map时,对含有相同的key记录进行了聚合操作,类似于MapReduce中的本地combiner,map-side预聚合之后,每个节点本地就只会有一条含有相同的key的记录,其他节点在拉取所有节点上的含有相同的key的记录时,就会大大减少需要拉取的数据量,从而减少磁盘I/O和网络传输开销。其中ReduceByKey和AggregateByKey属于map-side预聚合的算子,会对每个节点本地的含有相同key的记录进行预聚合,而groupByKey算子不会进行预聚合,全部的数据都会在集群的各个节点之间分发和传输,性能相对来说较差。

  • 准则五:熟悉各个算子的背后机制,选择使用高性能的算子。

    • 使用reduceByKey/aggregateByKey替代groupByKey。
    • 使用mapPartitions替代map,因为mapPartitions类的算子,一次函数调用会处理一个Partition所有的数据,而不是一次函数调用处理一条,所以性能相对来说会高一些。但有时候使用mapPartitions会出现OOM(内存溢出)的问题,因为单次函数调用就要处理一个Partition所有的数据,如果内存不够,垃圾回收时是无法回收太多对象的,很可能出现OOM异常,所以使用这类操作时要慎重。
    • 使用foreachPartition替代foreach,这类经典的应用场景是在写记录到数据库时,如果是普通的foreach算子,每次函数调用都需要创建一个数据库连接,然后写一条数据,这样会频繁地创建和销毁数据库连接,性能非常低下;但是如果用foreachPartitions算子一次性处理一个Partition的数据,那么对于每个Partition只要创建一个数据库连接,然后执行批量插入操作,这样性能肯定是比较高的。
    • 使用repartitionAndSortWithinPartitions替代Repartition与Sort类操作。如果在Repartition重分区之后还要进行排序,官方建议直接使用repartitionAndSortWithinPartitions算子,因为该算子可以一边进行重分区的Shuffle操作,一边进行排序。Shuffle与Sort两个操作同时进行,比先Shuffle再Sort来说,性能肯定是比较高的。
    • 对一个RDD执行filter算子后,如果可能过滤掉RDD中较多的数据(比如30%以上的数据),就建议使用coalesce算子,手动减少RDD中的Partition数量,将RDD中的数据压缩到更少的Partition中去,以减少并行度,避免过多开辟Task的开销,因为Task的开辟和销毁也是有开销的,这样只需使用更少的Task即可处理完所有的Partition。
  • 准则六:对大变量考虑使用广播机制。

    有时在开发过程中,会遇到需要在算子函数中使用外部大变量的场景(比如100M的大集合),那么此时就可以考虑使用广播(Broadcast)功能来提升性能,因为在算子函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到Task中,此时每个Task都有一个变量副本。如果变量本身比较大的话(比如100MB,甚至1GB),那么大量的变量副本在网络上中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC,都会极大地影响性能。因此如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播,广播后的变量会保证在每个Executor的内存中只驻留一份变量副本,而Executor中的Task执行时共享该Executor中的那份变量副本,这样的话,就可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低GC的频率。

  • 准则七:尽可能使用kryo优化序列化性能。

    Spark默认的序列化器是org.apache.spark.serializer.JavaSerializer,但同时也支持使用Kryo序列化器org.apache.spark.serializer.KryoSerializer,由于默认的序列化器的性能和空间表现都比较差,而Kryo序列化器更快,压缩率也更高,所以我们应该优先使用Kryo序列化器而不是默认的序列化器。

2、并行度

并行度指的就是RDD的分区数,由于一个分区对应一个Task,并行度也是一个Stage中的Task数,这些Task被并行处理。Spark有一套自己自动推导出默认的分区数的机制,但是由于Spark自动推导出来的默认分区数很多时候并不理想,必须人为地加以控制来改变并行度。Spark提供了四种改变并行度的方式:

  • 第一种,在使用读取外部数据源的textFile类算子时,可以通过可选的参数minPartitions来显示指定最小的分区数。

  • 第二种,针对已经存在的RDD,可以通过方法repartition()或coalesce()来改变并行度。

    • repartition():会产生shuffle,当任务耗时长且处理的数据量大时,如果计算只发生在部分Executor上,常用repartition()来重新分区,提高并行度,开辟更多的并行计算的任务来完成计算。
    • coalesce():默认不会产生Shuffle,当有大量小任务(任务处理的数据量小且耗时短)时,比如某个RDD的Filter操作后,由于过滤了大量数据,每个分区都只剩下了很少量的数据,这时常用coalesce()来合并分区,调小并行度,减少不必要的任务开辟与销毁的消耗。
  • 第三种,在对RDD进行Reduce类涉及Shuffle操作的算子时,这些算子大都可以接受一个显示指定的参数来确定新产生的RDD的分区数。

  • 第四种,也可以配置spark.default.parallelism来设置默认的并行度。该参数其实指定的就是在对RDD进行Reduce类涉及Shuffle操作的算子时,如果没有对这些算子显示指定参数来确定新产生的RDD的分区数时,这类Reduce类涉及Shuffle操作的算子产生的新的RDD的Partition数量,该参数也指定了Parallelize等没有Parent RDDs类操作的算子所产生的新的RDD的分区数。一个最佳实践是将并行度设置为集群的总的CPU Cores个数的2~3倍,比如Executor的总CPU Core数量为400个,那么设置1000个Task是可以的,此时可以充分利用Spark集群的资源。

3、垃圾回收调优

Spark垃圾回收调优的目标是确保只有长时间存活的RDD才保存到老生代区域;同时,新生代区域需要足够大以保存生命周期比较短的对象。这样,在执行任务期间可以避免执行Full GC。通过收集GC信息检查内存回收是否过于频繁:

  • 如果在任务结束之前执行了很多次Full GC,则表明任务执行的内存空间不足;
  • 如果老生代接近消耗殆尽,那么减少用于缓存的内存空间,可以通过配置属性spark.storage.memoryFraction来完成,通过减少用于缓存对象来提高执行速度是非常值得的。
  • 如果有过多的Minor GC而不是Full GC,那么为Eden分配更大的内存是有益的,可以为Eden分配大于任务执行所需要的内存空间。如果Eden的大小确定为E,那么可以通过-Xmn=4/3xE来设置新生代的大小。例如,如果任务从HDFS读取数据,那么任务需要的内存空间可以从读取的block数量估算出来(注意解压后的block数量通常为解压前的2~3倍)。所以,如果需要同时执行4个任务,block的大小为64M,可以估算出Eden的大小为4x3x64MB,监控内存回收的频率及消耗的时间,并修改相应的参数设置。

总结

Spark性能优化

相关文章:

  • 2021-11-22
  • 2021-11-22
  • 2021-12-05
  • 2022-12-23
  • 2022-12-23
  • 2021-11-22
  • 2021-11-22
  • 2021-11-22
猜你喜欢
  • 2021-08-20
  • 2021-11-22
  • 2021-11-22
  • 2021-08-20
  • 2021-06-10
  • 2021-10-31
相关资源
相似解决方案