上文中 菜鸟的Spark 源码学习之路 -6 Memory管理源码 -part1 功能概览,对Spark Memory管理的实现做了一个整体的了解,这次我们从MemoryManager开始深入了解spark 内存管理:
首先看一下MemoryManager的结构:
这里有几个重要的数据结构:
// -- Methods related to memory allocation policies and bookkeeping ------------------------------
@GuardedBy("this")
protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
@GuardedBy("this")
protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
@GuardedBy("this")
protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
@GuardedBy("this")
protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
protected[this] val maxOffHeapMemory = conf.get(MEMORY_OFFHEAP_SIZE)
protected[this] val offHeapStorageMemory =
(maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory)
offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)
主要是存储和执行的堆内内存划分和堆外内存划分。
上文讲到MemoryManager的两个重要实现UnifiedMemoryManager和StaticMemoryManager,如下图所示:
由于StaticMemoryManager主要是向前的兼容,我们重点看一下UnifiedMemoryManager。
/** * A [[MemoryManager]] that enforces a soft boundary between execution and storage such that * either side can borrow memory from the other. * * The region shared between execution and storage is a fraction of (the total heap space - 300MB) * configurable through `spark.memory.fraction` (default 0.6). The position of the boundary * within this space is further determined by `spark.memory.storageFraction` (default 0.5). * This means the size of the storage region is 0.6 * 0.5 = 0.3 of the heap space by default. //执行和存储的内存区域中,共享内存在总堆内存中的比例默认是0.6。共享内存中,边界的位置由spark.memory.storageFraction 参数决定。因此存储区域的内存在堆空间中的比例默认是0.3。 * Storage can borrow as much execution memory as is free until execution reclaims its space. * When this happens, cached blocks will be evicted from memory until sufficient borrowed * memory is released to satisfy the execution memory request. *//在其他执行收回空间前,可尽可能多的向其他的执行内存空间借用空闲内存资源。内存如果被收回,则缓存的数据块会被删除,直至可以重新“挪用”满足大小的内存空间为止。 * Similarly, execution can borrow as much storage memory as is free. However, execution * memory is *never* evicted by storage due to the complexities involved in implementing this. * The implication is that attempts to cache blocks may fail if execution has already eaten * up most of the storage space, in which case the new blocks will be evicted immediately * according to their respective storage levels. //同样地,执行内存也可以向存储内存“借用”空间。不同的是,由于执行本身存在复杂性,内存空间一旦占用无法被存储内存收回。这意味着执行占用过多内存后会导致缓存数据失败,这种情况下,会根据新存储的block存储优先级进行空间释放。 * * @param onHeapStorageRegionSize Size of the storage region, in bytes. * This region is not statically reserved; execution can borrow from * it if necessary. Cached blocks can be evicted only if actual * storage memory usage exceeds this region. */ private[spark] class UnifiedMemoryManager private[memory] ( conf: SparkConf, val maxHeapMemory: Long, onHeapStorageRegionSize: Long, numCores: Int) extends MemoryManager( conf, numCores, onHeapStorageRegionSize, maxHeapMemory - onHeapStorageRegionSize)
这里有两个方法用于计算堆内和堆外的最大剩余空间:
override def maxOnHeapStorageMemory: Long = synchronized {
maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed
}
override def maxOffHeapStorageMemory: Long = synchronized {
maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed
}
1. 执行内存空间获取
尝试为当前任务获取指定大小(numBytes)的内存空间,返回获取的空间大小,如果没有分配任何空间则返回0
/** * Try to acquire up to `numBytes` of execution memory for the current task and return the * number of bytes obtained, or 0 if none can be allocated. * * This call may block until there is enough free memory in some situations, to make sure each * task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of * active tasks) before it is forced to spill. This can happen if the number of tasks increase * but an older task had a lot of memory already. */
override private[memory] def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
assertInvariants()
assert(numBytes >= 0)
//获取当前内存状态
val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match {
case MemoryMode.ON_HEAP => (
onHeapExecutionMemoryPool,
onHeapStorageMemoryPool,
onHeapStorageRegionSize,
maxHeapMemory)
case MemoryMode.OFF_HEAP => (
offHeapExecutionMemoryPool,
offHeapStorageMemoryPool,
offHeapStorageMemory,
maxOffHeapMemory)
}
/**
* Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool.
*
* When acquiring memory for a task, the execution pool may need to make multiple
* attempts. Each attempt must be able to evict storage in case another task jumps in
* and caches a large block between the attempts. This is called once per attempt.
*/
def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
if (extraMemoryNeeded > 0) {
// There is not enough free memory in the execution pool, so try to reclaim memory from
// storage. We can reclaim any free memory from the storage pool. If the storage pool
// has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
// the memory that storage has borrowed from execution.
//在执行内存空间不足时,尝试从存储内存获取一部分空间。这里有两种方式:
//1. 存储内存区剩余空闲空间>所需内存空间
//2. 移除部分数据block,回收空间以供执行内存空间“借用”
val memoryReclaimableFromStorage = math.max(
storagePool.memoryFree,
storagePool.poolSize - storageRegionSize)
if (memoryReclaimableFromStorage > 0) {
// Only reclaim as much space as is necessary and available:
// 这里按最小需求释放空间
val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
//缩减存储内存
storagePool.decrementPoolSize(spaceToReclaim)
//扩展执行内存
executionPool.incrementPoolSize(spaceToReclaim)
}
}
}
/**
* The size the execution pool would have after evicting storage memory.
* 移除数据后执行池的大小
* The execution memory pool divides this quantity among the active tasks evenly to cap
* the execution memory allocation for each task. It is important to keep this greater
* than the execution pool size, which doesn't take into account potential memory that
* could be freed by evicting storage. Otherwise we may hit SPARK-12155.
* 执行内存会将剩余空间平均分配给每个Active的task。这个空间必须大于执行池在未回收空间时的大小。否则会出现问题。
* 此外,这个值必须小于maxMemory,保证执行内存在任务间的随机公正分配,因为某个任务可能需要大于平均分配的内存空间,
* 此时,该task会错误认为可以使用已占用的存储内存空间。
* Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness
* in execution memory allocation across tasks, Otherwise, a task may occupy more than
* its fair share of execution memory, mistakenly thinking that other tasks can acquire
* the portion of storage memory that cannot be evicted.
*/
def computeMaxExecutionPoolSize(): Long = {
maxMemory - math.min(storagePool.memoryUsed, storageRegionSize)
}
//获取内存空间
executionPool.acquireMemory(
numBytes, taskAttemptId, maybeGrowExecutionPool, () => computeMaxExecutionPoolSize)
}
这里有两个嵌套方法,一个用于尝试回收存储内存并扩展执行内存,一个用于计算执行内存剩余的最大空间。
2. 存储内存空间获取
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean = synchronized {
assertInvariants()
assert(numBytes >= 0)
val (executionPool, storagePool, maxMemory) = memoryMode match {
case MemoryMode.ON_HEAP => (
onHeapExecutionMemoryPool,
onHeapStorageMemoryPool,
maxOnHeapStorageMemory)
case MemoryMode.OFF_HEAP => (
offHeapExecutionMemoryPool,
offHeapStorageMemoryPool,
maxOffHeapStorageMemory)
}
//所需空间>全部存储空间,此情况不可能满足需求
if (numBytes > maxMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
s"memory limit ($maxMemory bytes)")
return false
}
// 所需空间不足,尝试“挪用”部分执行内存
if (numBytes > storagePool.memoryFree) {
// There is not enough free memory in the storage pool, so try to borrow free memory from
// the execution pool.
// 计算可释放空间的最小值
val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree,
numBytes - storagePool.memoryFree)
executionPool.decrementPoolSize(memoryBorrowedFromExecution)
storagePool.incrementPoolSize(memoryBorrowedFromExecution)
}
storagePool.acquireMemory(blockId, numBytes)
}
以上过程基本与执行内存获取类似,基本过程是回收执行内存用于扩展存储内存。
现在我们梳理一下:MemoryManager主要有维护了Spark的内存,分为执行计算内存和存储内存,每个内存又包括堆内和堆外内存空间。它主要提供了两个入口方法用于获取执行内存和存储内存。实际上,内存的获取都是由执行内存和存储内存对应的MemoryPool来完成的。下一次我们就深入看一下MemoryPool如何进行某个内存空间的管理。