环境:

spark 2.3.3

scala 2.11.8

Java 1.8.0_141

 

为什么要checkpoint?

checkpoint和persist功能相似,都是为了更高效的复用RDD,但是Checkpoint可以进行容错,即driver失败后,可以基于checkpoint 数据进行重新启动。

 

checkpoint推荐使用方式:一般在进行checkpoint方法调用前都要进行persist,来把当前RDD的数据持久化到内存或者磁盘上,这是因为checkpoint是lazy级别,必须有action触发job,且在该job执行完成后才会从后往前回溯哪个RDD进行了Checkpoint标记,然后对该标记了要进行Checkpoint的RDD新启动一个Job执行具体的Checkpoint过程。

 

下面我们从源码开始分析checkpoint的具体流程:

首先解析客户端程序:

通过SparkContext设置checkpoint的存储目录,然后对RDD调用checkpoint方法,标记该RDD为checkpointing,最后调用action算子进行作业提交。

Spark checkpoint 功能源码详解

 

SparkContext设置checkpoint的存储目录的源码如下:

SparkContext:

Spark checkpoint 功能源码详解

需要注意的是:在集群模式下,checkpoint的路径不能是本地路径,因为使用本地路径的话,checkpoint数据是存放在executor所在节点的本地文件系统中的,此时driver如果需要基于checkpoint数据重新构建RDD的话,会有问题的。

 

接着,我们看下checkpoint具体执行生效的地方

SparkContext:

Spark checkpoint 功能源码详解

 

rdd的checkpoint操作,是在action算子对应的Job执行完后,才进行checkpoint的,这样就不会影响原Job的执行效率。

 

下面我们具体分析RDD.doCheckpoint方法:

源码如下:RDD:

Spark checkpoint 功能源码详解

这里其实对父RDD递归调用了doCheckpoint方法。

 

具体解析如下:

Spark checkpoint 功能源码详解

 

Spark checkpoint 功能源码详解

 

从上面的源码分析中可以看到,checkpoint改变了原RDD的Lineage。

 

 

Spark checkpoint 功能源码详解

 

 

相关文章: