笔者在之前的文章《实战深入理解 Delta Lake 事务日志》中带大家把 Delta Lake 的事务日志大致操作了一遍,并进行了具体的分析。
有了之前的基础,笔者将在本篇文章中继续和大家一起深入研究 Delta Lake 事务日志的源码实现,学习 Delta Lake 开源项目的工程经验。
环境信息
Delta Lake Github:
https://github.com/delta-io/delta
笔者选取的版本为最新发布版本 v0.4.0,源码下载地址为:
https://github.com/delta-io/delta/releases/tag/v0.4.0
看一下 Delta Lake 项目的目录结构:
大部分代码实现都在 org.apache.spark.sql.delta 包下面。代码整体层次还是很清晰的,Scala 编程语言实现。
Delta Lake 事务日志源码分析
读者最好先大体看一下代码结构,点点看。
有没有发现什么?有的读者,可能发现了什么,不知道从哪里入手。
但是要不了多久,聪明的读者会发现 DeltaLog 这个类,打开看看。
org.apache.spark.sql.delta.DeltaLog
/*** Used to query the current state of the log as well as modify it by adding* new atomic collections of actions.** Internally, this class implements an optimistic concurrency control* algorithm to handle multiple readers or writers. Any single read* is guaranteed to see a consistent snapshot of the table.*/
DeltaLog 类的注释中有一句话很重要:
Internally, this class implements an optimistic concurrency control algorithm to handle multiple readers or writers. Any single read is guaranteed to see a consistent snapshot of the table.
大致意思为:
在内部,DeltaLog 类实现了一个乐观并发控制算法来处理并发读取或写入操作。任何一次读操作都保证看到表的一致性快照。
为了更方便的分析,我们直接看一下事务开始的代码:
/* ------------------ *| Delta Management |* ------------------ *//*** Returns a new [[OptimisticTransaction]] that can be used to read the current state of the* log and then commit updates. The reads and updates will be checked for logical conflicts* with any concurrent writes to the log.** Note that all reads in a transaction must go through the returned transaction object, and not* directly to the [[DeltaLog]] otherwise they will not be checked for conflicts.*/def startTransaction(): OptimisticTransaction = {update()new OptimisticTransaction(this)}
其实这里注释说的不是很清楚,不着急,我们接着分析。但是这里出现的 OptimisticTransaction 类是事务日志的关键类,对于事务日志的持久化都需要通过这个类,这也正是上面所提到的乐观事务,下面我们将具体分析该类。
OptimisticTransaction 类,直接看名字的意思很明确,乐观事务。该类维护了一个 case class,即 CommitStats。CommitStats 记录了一个成功的事务提交的 metrics,如下:
case class CommitStats(/** The version read by the txn when it starts. */startVersion: Long,/** The version committed by the txn. */commitVersion: Long,/** The version read by the txn right after it commits. It usually equals to commitVersion,* but can be larger than commitVersion when there are concurrent commits. */readVersion: Long,txnDurationMs: Long,commitDurationMs: Long,numAdd: Int,numRemove: Int,bytesNew: Long,/** The number of files in the table as of version `readVersion`. */numFilesTotal: Long,/** The table size in bytes as of version `readVersion`. */sizeInBytesTotal: Long,/** The protocol as of version `readVersion`. */protocol: Protocol,info: CommitInfo,newMetadata: Option[Metadata],numAbsolutePathsInAdd: Int,numDistinctPartitionsInAdd: Int,isolationLevel: String)
OptimisticTransaction 类定义如下实现内容,包含一些笔者额外标记的注释,具体分析请继续看后文:
...trait OptimisticTransactionImpl extends TransactionalWrite {...// commit 方法,参数见后文说明@throws(classOf[ConcurrentModificationException])def commit(actions: Seq[Action], op: DeltaOperations.Operation): Long = recordDeltaOperation(deltaLog,"delta.commit") {val version = try {// 事务日志提交前的准备工作// Try to commit at the next version.var finalActions = prepareCommit(actions, op)// 如果本次更新要删除之前文件,则 isBlindAppend 为 false,否则为 trueval isBlindAppend = {val onlyAddFiles =finalActions.collect { case f: FileAction => f }.forall(_.isInstanceOf[AddFile])onlyAddFiles && !dependsOnFiles}// 如果 commitInfo.enabled 参数设置为 true,则需要把 commitInfo 记录到事务日志里面if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COMMIT_INFO_ENABLED)) {commitInfo = CommitInfo(clock.getTimeMillis(),op.name,op.jsonEncodedValues,Map.empty,Some(readVersion).filter(_ >= 0),None,Some(isBlindAppend))finalActions = commitInfo +: finalActions}// 开始写事务日志,如果检测到任何冲突,会尝试解决逻辑冲突并使用新版本提交val commitVersion = doCommit(snapshot.version + 1, finalActions, 0)logInfo(s"Committed delta #$commitVersion to ${deltaLog.logPath}")// 对事务日志执行 checkpoint 操作postCommit(commitVersion, finalActions)commitVersion} catch {case e: DeltaConcurrentModificationException =>recordDeltaEvent(deltaLog, "delta.commit.conflict." + e.conflictType)throw ecase NonFatal(e) =>recordDeltaEvent(deltaLog, "delta.commit.failure", data = Map("exception" -> Utils.exceptionString(e)))throw e}version}...
为了方便分析和以后查看,我贴了该 commit 方法的全部实现,请读者忍受一下。这个方法非常重要,包含大部分事务日志实现的代码。
commit 方法的参数
commit 方法定义:
def commit(actions: Seq[Action], op: DeltaOperations.Operation): Long = recordDeltaOperation {...}
-
actions: Seq[Action]
记录事务记录(SetTransaction)
表更新操作产生的新文件(AddFile)
删除文件(RemoveFile)
元数据(metaData)
更新操作首次初始化protocol(Protocol)
提交信息(CommitInfo)
-
op: DeltaOperations.Operation
Delta 操作类型,包括Write、StreamingUpdate、Delete、Truncate、Update等一系列操作类型,具体请查看 DeltaOperations.scala 。
commit 方法的三部曲
整体看完 commit 方法后,主要分为三部分内容:
1. prepareCommit
2. doCommit
3. postCommit
1. prepareCommit 方法
protected def prepareCommit(actions: Seq[Action],op: DeltaOperations.Operation): Seq[Action] = {// 事务是否已经提交,增加断言assert(!committed, "Transaction already committed.")// 1. 如果更新了表的 Metadata 信息,那么需要将其写入到事务日志里面// If the metadata has changed, add that to the set of actionsvar finalActions = newMetadata.toSeq ++ actionsval metadataChanges = finalActions.collect { case m: Metadata => m }assert(metadataChanges.length <= 1,"Cannot change the metadata more than once in a transaction.")metadataChanges.foreach(m => verifyNewMetadata(m))// 2. 首次提交事务日志,那么会确保 _delta_log 目录要存在,然后检查 finalActions 里面是否有 Protocol,没有的话需要初始化 protocol 版本if (snapshot.version == -1) {deltaLog.ensureLogDirectoryExist()if (!finalActions.exists(_.isInstanceOf[Protocol])) {finalActions = Protocol() +: finalActions}}finalActions = finalActions.map {// 3. 当第一次提交,并且有 Metadata,那么会将 Delta Lake 的全局配置信息加入到 Metadata 里面case m: Metadata if snapshot.version == -1 =>val updatedConf = DeltaConfigs.mergeGlobalConfigs(spark.sessionState.conf, m.configuration, Protocol())m.copy(configuration = updatedConf)case other => other}deltaLog.protocolWrite(snapshot.protocol,logUpgradeMessage = !actions.headOption.exists(_.isInstanceOf[Protocol]))// 4. 在检查是否需要删除文件时,我们要确保这不是一个 appendOnly 表。val removes = actions.collect { case r: RemoveFile => r }if (removes.exists(_.dataChange)) deltaLog.assertRemovable()finalActions}
prepareCommit 里面的重要操作,根据代码的注释标记了1、2、3和4,具体为:
1. 由于 Delta Lake 表允许对已经存在的表模式进行修改,比如添加新列或者覆盖原有表的模式等,需要将新的 Metadata 写入到事务日志里面。Metadata 里面存储了表的 schema、分区列、表的配置、表的创建时间等信息,如下:
case class Metadata(id: String = java.util.UUID.randomUUID().toString,name: String = null,description: String = null,format: Format = Format(),schemaString: String = null,partitionColumns: Seq[String] = Nil,configuration: Map[String, String] = Map.empty,@JsonDeserialize(contentAs = classOf[java.lang.Long])createdTime: Option[Long] = Some(System.currentTimeMillis())
2. 如果是首次提交事务日志,那么先检查表的
_delta_log目录是否存在,不存在则创建。然后检查是否设置了 protocol 的版本,如果没有设置,则使用默认的协议版本,默认的协议版本中 Action.readerVersion = 1,Action.writerVersion = 2。3. 如果是第一次提交,并且存在 Metadata ,那么会将 Delta Lake 的配置信息加入到 Metadata 里面。Delta Lake 表的配置信息主要是在 org.apache.spark.sql.delta.sources.DeltaSQLConf 类里面定义的,比如可以在创建 Delta Lake 表的时候指定多久做一次 Checkpoint。
4. 可以通过 spark.databricks.delta.properties.defaults.appendOnly 参数将表设置为仅允许追加,所以如果当 actions 里面存在 RemoveFile,那么我们需要判断表是否允许删除。
prepareCommit 方法的返回值为 finalActions,这些信息就是需要写入到事务日志里面的数据。
var finalActions = prepareCommit(actions, op)val isBlindAppend = {val onlyAddFiles =finalActions.collect { case f: FileAction => f }.forall(_.isInstanceOf[AddFile])onlyAddFiles && !dependsOnFiles}
紧接着会判断这次事务变更是否需要删除之前的文件,如果是,那么 isBlindAppend 为 false,否则为 true。
if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COMMIT_INFO_ENABLED)) {commitInfo = CommitInfo(clock.getTimeMillis(),op.name,op.jsonEncodedValues,Map.empty,Some(readVersion).filter(_ >= 0),None,Some(isBlindAppend))finalActions = commitInfo +: finalActions}
当 commitInfo.enabled 参数设置为 true(默认值),那么还需要将 commitInfo 写入到事务日志文件里面。CommitInfo 里面包含了操作时间、操作的类型(Write、Update等)等重要信息。
接下来开始调用 doCommit 方法。
2. doCommit 方法
doCommit 方法传入两个参数:
val commitVersion = doCommit(snapshot.version + 1, finalActions, 0)
doCommit 方法的第一个参数传递是 snapshot.version + 1。snapshot.version 其实就是事务日志中最新的版本,我们再来查看一下 Delta Lake 表的目录信息:
如果snapshot.version 的值为1,那么这次 doCommit 的更新版本为 2。
doCommit 方法具体内容如下:
private def doCommit(attemptVersion: Long,actions: Seq[Action],attemptNumber: Int): Long = deltaLog.lockInterruptibly {try {logDebug(s"Attempting to commit version $attemptVersion with ${actions.size} actions")// 1. 正式写事务日志的操作deltaLog.store.write(deltaFile(deltaLog.logPath, attemptVersion),actions.map(_.json).toIterator)val commitTime = System.nanoTime()// 2. 由于发生了数据更新,所以更新内存中事务日志的最新快照,并做相关判断val postCommitSnapshot = deltaLog.update()if (postCommitSnapshot.version < attemptVersion) {throw new IllegalStateException(s"The committed version is $attemptVersion " +s"but the current version is ${postCommitSnapshot.version}.")}// 3. 发送一些统计信息var numAbsolutePaths = 0var pathHolder: Path = nullval distinctPartitions = new mutable.HashSet[Map[String, String]]val adds = actions.collect {case a: AddFile =>pathHolder = new Path(new URI(a.path))if (pathHolder.isAbsolute) numAbsolutePaths += 1distinctPartitions += a.partitionValuesa}val stats = CommitStats(startVersion = snapshot.version,commitVersion = attemptVersion,readVersion = postCommitSnapshot.version,txnDurationMs = NANOSECONDS.toMillis(commitTime - txnStartNano),commitDurationMs = NANOSECONDS.toMillis(commitTime - commitStartNano),numAdd = adds.size,numRemove = actions.collect { case r: RemoveFile => r }.size,bytesNew = adds.filter(_.dataChange).map(_.size).sum,numFilesTotal = postCommitSnapshot.numOfFiles,sizeInBytesTotal = postCommitSnapshot.sizeInBytes,protocol = postCommitSnapshot.protocol,info = Option(commitInfo).map(_.copy(readVersion = None, isolationLevel = None)).orNull,newMetadata = newMetadata,numAbsolutePaths,numDistinctPartitionsInAdd = distinctPartitions.size,isolationLevel = null)recordDeltaEvent(deltaLog, "delta.commit.stats", data = stats)attemptVersion} catch {case e: java.nio.file.FileAlreadyExistsException =>checkAndRetry(attemptVersion, actions, attemptNumber)}}
根据注释标记的数字顺序介绍:
1. 正式写事务日志的操作,其中 store 是通过 spark.delta.logStore.class 参数指定的,目前支持 HDFS、S3、Local 等存储介质,默认是 HDFS。具体的写事务操作的过程,接下来介绍。
2. 持久化事务日志之后,更新内存中的事务日志最新的快照,然后做相关的合法性校验。
3. 发送一些统计信息。
我们针对 deltaLog 写事务日志操作专门进行解说:
deltaLog.store.write(deltaFile(deltaLog.logPath, attemptVersion),actions.map(_.json).toIterator)
write 方法传入两个参数:
HDFS路径,deltaFile 方法返回值
/** Returns the path for a given delta file. */def deltaFile(path: Path, version: Long): Path = new Path(path, f"$version%020d.json")
actions,doCommit 方法传入的参数 finalActions
write 方法的实现支持好几种存储,比如HDFS、S3、Azure等,这里以大数据平台常用的 HDFS 分布式存储系统来分析。
HDFSLogStore 类实现 LogStore 接口,查看 write 方法的实现:
def write(path: Path, actions: Iterator[String], overwrite: Boolean = false): Unit = {val isLocalFs = path.getFileSystem(getActiveHadoopConf).isInstanceOf[RawLocalFileSystem]if (isLocalFs) {synchronized {writeInternal(path, actions, overwrite)}} else {writeInternal(path, actions, overwrite)}}
其实 write 调用的核心方法为 writeInternal,如下:
private def writeInternal(path: Path, actions: Iterator[String], overwrite: Boolean): Unit = {// 1. 获取 HDFS 的 FileContext 用于后面写事务日志val fc = getFileContext(path)// 2. 如果需要写的事务日志已经存在那么就需要抛出异常,后面再重试if (!overwrite && fc.util.exists(path)) {// This is needed for the tests to throw error with local file systemthrow new FileAlreadyExistsException(path.toString)}// 3. 事务日志先写到临时文件val tempPath = createTempPath(path)var streamClosed = false // This flag is to avoid double closevar renameDone = false // This flag is to save the delete operation in most of cases.val stream = fc.create(tempPath, EnumSet.of(CREATE), CreateOpts.checksumParam(ChecksumOpt.createDisabled()))try {// 4. 将本次修改产生的 actions 写入到临时事务日志里actions.map(_ + "\n").map(_.getBytes(UTF_8)).foreach(stream.write)stream.close()streamClosed = truetry {val renameOpt = if (overwrite) Options.Rename.OVERWRITE else Options.Rename.NONE// 5. 将临时的事务日志移到正式的事务日志里面,如果移动失败则抛出异常,后面再重试fc.rename(tempPath, path, renameOpt)renameDone = true// TODO: this is a workaround of HADOOP-16255 - remove this when HADOOP-16255 is resolvedtryRemoveCrcFile(fc, tempPath)} catch {case e: org.apache.hadoop.fs.FileAlreadyExistsException =>throw new FileAlreadyExistsException(path.toString)}} finally {if (!streamClosed) {stream.close()}// 删除临时事务日志if (!renameDone) {fc.delete(tempPath, false)}}}
writeInternal 方法的实现过程,就是对 HDFS 进行写文件操作,结合上面数字标记的顺序,具体说明如下:
获取 HDFS 的 FileContext 用于写事务日志
如果需要写的事务日志已经存在,那么就需要抛出异常,然后再重试
写事务日志的时候是先写到表
_delta_lake目录下的临时文件里面将本次更新操作的事务记录写到临时文件里
写完事务日志之后,需要将临时事务日志最后移动动正式的日志文件里面。这里需要注意,在写事务日志文件的过程中同样存在多个用户修改表,拿 00000000000000000004.json 这个文件举例,很可能已经被别的修改占用了,这时候也需要抛出 FileAlreadyExistsException 异常,以便后面重试
到此,Delta Lake 的事务日志写操作就完成了。这里需要注意的是,doCommit 有可能会失败,抛出 FileAlreadyExistsException 异常。Delta Lake 在实现 doCommit 方法时捕获了这个异常,并在异常捕获里面调用 checkAndRetry(attemptVersion, actions, attemptNumber) 方法进行重试操作:
} catch {case e: java.nio.file.FileAlreadyExistsException =>checkAndRetry(attemptVersion, actions, attemptNumber)}
checkAndRetry 方法非常简单,这里就不细说了,只是需要注意,重试的版本是刚刚更新内存中事务日志快照的版本加上1:
// 因为上次更新事务日志发生冲突,所以需要再一次读取磁盘中持久化的事务日志,并更新内存中事务日志快照deltaLog.update()// 重试的版本是刚刚更新内存中事务日志快照的 version + 1val nextAttempt = deltaLog.snapshot.version + 1
checkAndRetry 方法只有在事务日志写冲突的时候才会出现,主要目的是重写当前的事务日志。
当事务日志成功持久化到磁盘之后,最后再执行 postCommit 操作。
3. postCommit 方法
protected def postCommit(commitVersion: Long, commitActions: Seq[Action]): Unit = {committed = trueif (commitVersion != 0 && commitVersion % deltaLog.checkpointInterval == 0) {try {deltaLog.checkpoint()} catch {case e: IllegalStateException =>logWarning("Failed to checkpoint table state.", e)}}}
postCommit 的实现相对来说是最简单的,功能就是判断需不需要对事务日志做一次 checkpoint 操作。deltaLog.checkpointInterval 的值是通过 spark.databricks.delta.properties.defaults.checkpointInterval 参数设置的,默认每写10次事务日志做一次 checkpoint。
checkpoint 的其实就是将内存中事务日志的最新快照持久化到磁盘里面,如下:
/delta/mydelta.db/user_info/_delta_log/00000000000000000010.checkpoint.parquet 文件就是对事务日志进行 checkpoint 的文件,里面汇总了 00000000000000000000.json - 00000000000000000010.json 之间的所有事务操作记录。那么下一次如果再构建事务日志的快照时,只需要从 00000000000000000010.checkpoint.parquet 文件以及往后更新的文件开始构造,而无需再读取 00000000000000000000.json 到 00000000000000000010.json 之间的事务操作。
另外,我们还可以从 HDFS 路径看出,checkpoint 之后还会生成一个 _last_checkpoint 文件,里面记录了最后一次 checkpoint 的版本,checkpoint 文件里面的 Action 条数,如下:
{"version":10,"size":13}
到此,笔者已经带大家完成了对 Delta Lake 事务日志的源码实现的研究,希望大家对 Delta Lake 的认识更深一层。