上一次我们了解内存管理中MemoryPool的源码实现,这一次我们从下往上,学习spark的存储管理。Storage包结构如下:
本次我们从Block的管理入手开始学习存储管理。从上面的包中我们可以看出,Block管理主要包含BlockManagerMaster和BlockManager两个部分。BlockManager和MemoryManager一样是SparkEnv中的一个组件,如下图所示:
1. BlockManagerMaster
SparkEnv 在初始化BlockManager前,先创建了一个BlockManagerMaster:
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint( BlockManagerMaster.DRIVER_ENDPOINT_NAME, new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)), conf, isDriver) // NB: blockManager is not valid until initialize() is called later. // 在调用initialize方法前,BlockManager不可用 val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster, serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores)
此处调用了如下方法:
def registerOrLookupEndpoint(
name: String, endpointCreator: => RpcEndpoint):
RpcEndpointRef = {
if (isDriver) {
logInfo("Registering " + name)
rpcEnv.setupEndpoint(name, endpointCreator)
} else {
RpcUtils.makeDriverRef(name, conf, rpcEnv)
}
}
在是节点Driver 的情况下,创建RpcEndpointRef,否则获取RpcEndpointRef的引用。
BlockManagerMaster主要包括一些Executor,Block,BlockManager的管理方法:
1.1 注册BlockManager
/**
* Register the BlockManager's id with the driver. The input BlockManagerId does not contain
* topology information. This information is obtained from the master and we respond with an
* updated BlockManagerId fleshed out with this information.
*/
def registerBlockManager(
blockManagerId: BlockManagerId,
maxOnHeapMemSize: Long,
maxOffHeapMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId = {
logInfo(s"Registering BlockManager $blockManagerId")
val updatedId = driverEndpoint.askSync[BlockManagerId](
// 在本地的BlockManagerMaster中完成注册
RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
logInfo(s"Registered BlockManager $updatedId")
updatedId
}
这里BlockManager发起注册请求,向BlockManagerMaster进行注册。远端处理请求在BlockManagerMasterEndpoint.receiveAndReply方法中,对各种请求进行了匹配
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) =>
context.reply(register(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
case _updateBlockInfo @
UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
context.reply(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size))
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
case GetLocations(blockId) =>
context.reply(getLocations(blockId))
case GetLocationsAndStatus(blockId) =>
context.reply(getLocationsAndStatus(blockId))
case GetLocationsMultipleBlockIds(blockIds) =>
context.reply(getLocationsMultipleBlockIds(blockIds))
case GetPeers(blockManagerId) =>
context.reply(getPeers(blockManagerId))
case GetExecutorEndpointRef(executorId) =>
context.reply(getExecutorEndpointRef(executorId))
case GetMemoryStatus =>
context.reply(memoryStatus)
case GetStorageStatus =>
context.reply(storageStatus)
case GetBlockStatus(blockId, askSlaves) =>
context.reply(blockStatus(blockId, askSlaves))
case GetMatchingBlockIds(filter, askSlaves) =>
context.reply(getMatchingBlockIds(filter, askSlaves))
case RemoveRdd(rddId) =>
context.reply(removeRdd(rddId))
case RemoveShuffle(shuffleId) =>
context.reply(removeShuffle(shuffleId))
case RemoveBroadcast(broadcastId, removeFromDriver) =>
context.reply(removeBroadcast(broadcastId, removeFromDriver))
case RemoveBlock(blockId) =>
removeBlockFromWorkers(blockId)
context.reply(true)
case RemoveExecutor(execId) =>
removeExecutor(execId)
context.reply(true)
case StopBlockManagerMaster =>
context.reply(true)
stop()
case BlockManagerHeartbeat(blockManagerId) =>
context.reply(heartbeatReceived(blockManagerId))
case HasCachedBlocks(executorId) =>
blockManagerIdByExecutor.get(executorId) match {
case Some(bm) =>
if (blockManagerInfo.contains(bm)) {
val bmInfo = blockManagerInfo(bm)
context.reply(bmInfo.cachedBlocks.nonEmpty)
} else {
context.reply(false)
}
case None => context.reply(false)
}
}
其他的操作流程大致类似,都是通过远端RPC请求进行操作。RPC消息均是一系列的case class:
2. BlockManager
/**
* Manager running on every node (driver and executors) which provides interfaces for putting and
* retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
* 运行在每个node的上,提供本地和远程存放和获取不同存储媒介上blocks的接口。必须调用initialize方法保证其可用
* Note that [[initialize()]] must be called before the BlockManager is usable.
*/
private[spark] class BlockManager(
executorId: String,
rpcEnv: RpcEnv,
val master: BlockManagerMaster,
val serializerManager: SerializerManager,
val conf: SparkConf,
memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
val blockTransferService: BlockTransferService,
securityManager: SecurityManager,
numUsableCores: Int)
2.1 初始化
接下来我们就看一下initialize方法:
/** * Initializes the BlockManager with the given appId. This is not performed in the constructor as * the appId may not be known at BlockManager instantiation time (in particular for the driver, * where it is only learned after registration with the TaskScheduler). * 初始化给定Appid对应的BlockManager。 不在构造方法中初始化的原因在于AppId在BlockManager实例化时可能无法获取,尤其是在driver端,只有在TaskScheduler中注册后才会有Appid。 * This method initializes the BlockTransferService and ShuffleClient, registers with the * BlockManagerMaster, starts the BlockManagerWorker endpoint, and registers with a local shuffle * service if configured. 本方法主要初始化BlockTransferService 和ShuffleClient,向BlockManagerMaster发起注册请求,启动BlockManagerWorker 终端,如果配置了本地shuffle服务,则同时注册该服务。 */ def initialize(appId: String): Unit = { blockTransferService.init(this) shuffleClient.init(appId) blockReplicationPolicy = { val priorityClass = conf.get( "spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName) val clazz = Utils.classForName(priorityClass) val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy] logInfo(s"Using $priorityClass for block replication policy") ret } val id = BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None) val idFromMaster = master.registerBlockManager( id, maxOnHeapMemory, maxOffHeapMemory, slaveEndpoint) blockManagerId = if (idFromMaster != null) idFromMaster else id shuffleServerId = if (externalShuffleServiceEnabled) { logInfo(s"external shuffle service port = $externalShuffleServicePort") BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort) } else { blockManagerId } // Register Executors' configuration with the local shuffle service, if one should exist. if (externalShuffleServiceEnabled && !blockManagerId.isDriver) { registerWithExternalShuffleServer() } logInfo(s"Initialized BlockManager: $blockManagerId") }
2.2 获取Block数据
接下来我们看一下获取本地Block数据的接口
/**
* Interface to get local block data. Throws an exception if the block cannot be found or
* cannot be read successfully.
*/
override def getBlockData(blockId: BlockId): ManagedBuffer = {
if (blockId.isShuffle) {
shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
} else {
getLocalBytes(blockId) match {
case Some(blockData) =>
new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, true)
case None =>
// If this block manager receives a request for a block that it doesn't have then it's
// likely that the master has outdated block statuses for this block. Therefore, we send
// an RPC so that this block is marked as being unavailable from this block manager.
reportBlockStatus(blockId, BlockStatus.empty)
throw new BlockNotFoundException(blockId.toString)
}
}
}
这里调用了getLocalBytes获取序列化后的block数据
/**
* Get block from the local block manager as serialized bytes.
*/
def getLocalBytes(blockId: BlockId): Option[BlockData] = {
logDebug(s"Getting local block $blockId as bytes")
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
// 这里做了优化,如果block是用于shuffle,则不对其加锁。 因为磁盘存储不会删处这些数据
if (blockId.isShuffle) {
val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
// TODO: This should gracefully handle case where local block is not available. Currently
// downstream code will throw an exception.
// 尝试获取block数据
val buf = new ChunkedByteBuffer(
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
Some(new ByteBufferBlockData(buf, true))
} else {
blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) }
//加读锁,保证在读取过程中数据没有改变
}
}
上述方法在读取的时对数据加了读锁,接下来我们看一下锁的释放:
这里有一个releaseLock方法
/**
* Release a lock on the given block with explicit TID.
* The param `taskAttemptId` should be passed in case we can't get the correct TID from
* TaskContext, for example, the input iterator of a cached RDD iterates to the end in a child
* thread.
*/
def releaseLock(blockId: BlockId, taskAttemptId: Option[Long] = None): Unit = {
blockInfoManager.unlock(blockId, taskAttemptId)
}
调用了 blockInfoManager.unlock方法
先看一下blockInfoManager,它是BlockManager 中用于元数据跟踪和block锁管理的组件。它的锁接口主要是读写锁。每个锁的获取都与一个运行中的task相关联,锁在task完成或失败时会自动释放。
/** * Component of the [[BlockManager]] which tracks metadata for blocks and manages block locking. * * The locking interface exposed by this class is readers-writer lock. Every lock acquisition is * automatically associated with a running task and locks are automatically released upon task * completion or failure. * * This class is thread-safe. */ private[storage] class BlockInfoManager extends Logging
再看一下具体的释放锁过程:
/**
* Release a lock on the given block.
* In case a TaskContext is not propagated properly to all child threads for the task, we fail to
* get the TID from TaskContext, so we have to explicitly pass the TID value to release the lock.
*
* See SPARK-18406 for more discussion of this issue.
*/
def unlock(blockId: BlockId, taskAttemptId: Option[TaskAttemptId] = None): Unit = synchronized {
val taskId = taskAttemptId.getOrElse(currentTaskAttemptId)
logTrace(s"Task $taskId releasing lock for $blockId")
val info = get(blockId).getOrElse {
throw new IllegalStateException(s"Block $blockId not found")
}
if (info.writerTask != BlockInfo.NO_WRITER) {
info.writerTask = BlockInfo.NO_WRITER
//在确定没有写任务后,解除blockId和taskId的绑定关系
writeLocksByTask.removeBinding(taskId, blockId)
} else {
// 否则修改blockId的引用计数
assert(info.readerCount > 0, s"Block $blockId is not locked for reading")
info.readerCount -= 1
val countsForTask = readLocksByTask(taskId)
val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1
assert(newPinCountForTask >= 0,
s"Task $taskId release lock on block $blockId more times than it acquired it")
}
notifyAll()
}
以上就是获取数据的过程。