Spark 作为一个基于内存的分布式计算引擎,其内存管理模块在整个系统中扮演着非常重要的角色。理解 Spark 内存管理的基本原理,有助于更好地开发 Spark 应用程序和进行性能调优。本文介绍了Spark的内存框架以及介绍了Spark的哪些模块使用到了内存管理功能,Driver端内存管理比较简单,所以后续的分析都专注于Executor端的内存管理。
内存整体框架
作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的堆内[On-heap]空间进行了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外[Off-heap]内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用,整体架构如下所示:
对于一个Executor中,内存的相关结构如下所示:
-
org.apache.spark.memory.MemoryManager是JVM级别的内存管理者,负责具体内存的分配和回收; -
org.apache.spark.memory.TaskMemoryManager是任务级别的内存管理者,每个任务有一个TaskMemoryManager,它负责Spark任务内存的分配和回收,任务的内存申请通过该管理者跟MemeoryManager进行交互; -
org.apache.spark.memory.MemoryConsumer是内存的申请的client,发送请求到TaskMemoryManager,然后经过MemoryManager的内存分配和回收操作; -
org.apache.spark.memory.MemoryPool内存池,分为存储内存池和执行内存池,是Spark内存的最基本的抽象,由MemoryManager进行内存的协调。
堆内内存
当我们提交Spark任务时候,会指定–executor-memory 或 spark.executor.memory参数,该部分内存即为堆内内存的大小。Executor内并发运行的任务<由-executor-core或者spark.executor.core参数控制>共享 JVM 堆内内存,Spark任务执行过程中缓存的RDD数据和广播数据占用的内存被规划为存储[Storage]内存,Spark任务在执行Shuffle、Join、Aggregation时占用的内存被规划为执行[Execution]内存,剩余的部分不做特殊规划,那些Spark内部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用剩余的空间。
Spark 对堆内内存的管理是一种逻辑上的”规划式”的管理,因为对象实例占用内存的申请和释放都由 JVM 完成,Spark 只能在申请后和释放前记录这些内存。
堆外内存
为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 引入了堆外[Off-heap]内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。利用 JDK Unsafe API<实质上等同于C++中可以自己进行内存管理>,这样子减少了不必要的内存开销<java对象有对象头等开销>,以及JVM的频繁GC扫描和回收,提升了处理性能。堆外内存可以被精确地申请和释放,而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。在默认情况下堆外内存并不启用,可通过配置spark.memory.offHeap.enabled 参数启用,并由spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。
哪些情况下会使用内存
Spark中用到内存的地方有哪些?存储内存主要消耗在哪些地方?执行内存主要消耗在哪些地方?
存储内存
首先我们来看看存储内存,Spark任务中数据的缓存或者广播数据以及RDD缓存副本大于1时候会使用到存储内存。
Cache/persist存储内存申请
当RDD的Storage Level包括memory时[也就是调用了RDD.cache或RDD.persist将RDD数据缓存到了memory中],Task在计算得到RDD分区数据时会申请存储内存将数据缓存在内存中。
-
ShuffleMapTask/ResultTask.runTask -> RDD.iterator -> RDD.getOrCompute -> BlockManager.getOrElseUpdate -> BlockManager.doPutIterator -> MemoryStore.putIteratorAsBytes/MemoryStore.putIteratorAsValues -> MemoryStore.putIterator
-
ShuffleMapTask/ResultTask.runTask -> RDD.iterator -> RDD.getOrCompute -> BlockManager.getOrElseUpdate -> BlockManager.getLocalValues -> BlockManager.maybeCacheDiskValuesInMemory -> MemoryStore.putIteratorAsValues -> MemoryStore.putIterator
-
ShuffleMapTask/ResultTask.runTask -> RDD.iterator -> RDD.getOrCompute -> RDD.computeOrReadCheckpoint -> WriteAheadLogBackedBlockRDD.compute -> WriteAheadLogBackedBlockRDD.compute$getBlockFromWriteAheadLog -> blockManager.putBytes -> BlockManager.doPutBytes -> MemoryStore.putIteratorAsValues -> MemoryStore.putIterator
Broadcast存储内存申请
对广播变量进行存储/缓存也会用到存储内存,写入和读取的代码调用路径如下所示:
-
TorrentBroadcast.writeBlocks -> BlockManager.putSingle -> BlockManager.putIterator -> BlockManager.doPutIterator -> MemoryStore.putIteratorAsValues/MemoryStore.putIteratorAsValues -> MemoryStore.putIterator
-
TorrentBroadcast.readBroadcastBlock -> BlockManager.getLocalValues -> BlockManager.maybeCacheDiskValuesInMemory -> MemoryStore.putIteratorAsValues -> MemoryStore.putIterator
RDD block Replication存储内存申请
当RDD的storage level中的_replication大于1时,BlockManager需要将block数据发到另一个远程结点以备份,此时BlockManager会向远程结点发送UploadBlock消息,远程结点在收到该消息后会申请存储内存以存放收到的block数据。
-
NettyBlockRpcServer.receive -> BlockDataManager.putBlockData -> BlockManager.putBytes -> BlockManager.doPutBytes -> MemoryStore.putIteratorAsValues -> MemoryStore.putIterator
-
NettyBlockRpcServer.receiveStream -> BlockDataManager.putBlockDataAsStream -> BlockManager.putBytes -> BlockManager.doPutBytes -> MemoryStore.putIteratorAsValues -> MemoryStore.putIterator
Task运行结果数据相关的存储内存申请
TaskRunner处理task结果数据时,如果task结果数据大于maxDirectResultSize,则会将其存储到本地blockManager,然后将block的meta数据返回给driver,并且这个时候用的storeage level是MEMORY_AND_DISK_SER, 所以会向MemoryManager申请存储内存。
- TaskRunner.run -> BlockManager.putBytes -> BlockManager.doPutBytes -> MemoryStore.putIteratorAsValues -> MemoryStore.putIterator
执行内存
执行内存主要用于Spark任务在内存中进行Shuffle、Join、Sort以及Aggregations等计算操作,主要是各种MemoryConsumer向TaskMemoryManager申请内存,具体的后续文章会详细介绍。
参考
- https://www.jianshu.com/p/87a36488993a
- https://www.jianshu.com/p/f29fc887e89f
- https://developer.ibm.com/zh/articles/ba-cn-apache-spark-memory-management/