Persistence
Streaming没有做特别的事情,DStream最终还是以其中的每个RDD作为job进行调度的,所以persistence就以RDD为单位按照原先Spark的方式去做就可以了,不同的是Streaming是无限,需要考虑Clear的问题
在clearMetadata时,在删除过期的RDD的同时,也会做相应的unpersist
比较特别的是,NetworkInputDStream,是一定会做persistence的,因为会事先将流数据转化为persist block,然后NetworkInputDStream直接从block中读到数据
在design中看到NetworkInputDStream会将source data存两份,防止丢失,但在代码中没有找到这段逻辑,只看到往blockManager写入一份

Checkpoint
在Streaming中Checkpoint有特殊的意义
对于普通的Spark,没有cp不会影响正确性,因为任何数据都是可以从source replay出来的,而source data往往在HDFS上,所以cp只是一种优化。
并且Spark也只在worker级别做了failover,worker挂了,没事把上面的tasks换个worker重新replay出来即可, 但是并没有做driver的failover,driver挂了就失败了
因为Spark本身就看成是个query engine,query失败了没什么损失,again就ok

但是对于SparkStreaming,这个问题就没有那么简单了,如果driver挂掉,不做任何处理,恢复以后到底从哪里开始做?
首先一定会丢数据,影响正确性,因为流数据是无限的,你不可能像Spark一样把所有数据replay一遍,即使source支持replay,比如kafka

所以对于Streaming的checkpoint分为两部分,RDD的cp和DStreamGraph的cp
对于RDD的cp和Spark是一致的,没有区别
下面谈谈对于DStreamGraph的cp,目的就是在StreamingContext被重启后,可以从cp中恢复出之前Graph的执行时状况
a. Graph对象是会整个被序列化到文件,而其中最关键的是outputStreams,看似这里只会persist最终的outputStreams,其实会persist整个graph上所有的DStream
因为在def dependencies: List[DStream[_]]会包含所有的上一层DStream,依次递归,就会包含所有的DStream对象
在恢复出DStream对象后,如何恢复当时的RDD状况,可以看到generatedRDDs是@transient的,并不会被persist
答案在DStream.DStreamCheckpointData中,通过currentCheckpointFiles可以记录下cp时,generatedRDDs中所有完成cp的RDD的(times,cpfilename)
所以在恢复时只需要将RDD从cpfile中读出来,并加入到generatedRDDs即可
并且cpfile是需要清理的,当每次完成DStreamGraph的cp时,在该graph中的最老的RDD之前的所有RDD的cpfile都可以删掉,因为这些老的RDD不可能再被用到
b. 在Checkpoint对象中除了graph对象,还有该比较重要的是pendingTimes,这个记录在cp时,有多少的jobs没有被提交
这样当JobScheduler重新启动的时候会重新提交这些jobs,这里是at-least once逻辑,因为不知道在cp完多久后crash,所以其中某些job有可能已经被成功执行

创建cp的过程,
1. 在JobGenerator中,每次提交一组jobs到Spark后,会执行对DoCheckpoint将Checkpoint对象序列化写入文件(其中Checkpoint对象包含graph对象等信息)
2. 在完成DoCheckpoint后,会调用ClearCheckpointData清除过期的RDD的checkpoint文件

使用cp的过程,
1. 调用StreamingContext.getOrCreate,使用CheckpointReader.read从文件中反序列化出Checkpoint对象, 并使用Checkpoint对象去初始化StreamingContext对象
2. 在StreamingContext中调用cp_.graph.restoreCheckpointData来恢复每个DStream.generatedRDDs
3. 在JobGenerator中调用Restart,重新提交哪些在cp中未被提交的jobs

 

DStreamGraph

extends Serializable with Logging {

  private val inputStreams = new ArrayBuffer[InputDStream[_]]()
  private val outputStreams = new ArrayBuffer[DStream[_]]()

  var rememberDuration: Duration = null
  var checkpointInProgress = false

  var zeroTime: Time = null
  var startTime: Time = null
  var batchDuration: Duration = null

  def updateCheckpointData(time: Time) {
    logInfo("Updating checkpoint data for time " + time)
    this.synchronized {
      outputStreams.foreach(_.updateCheckpointData(time))
    }
    logInfo("Updated checkpoint data for time " + time)
  }

  def clearCheckpointData(time: Time) {
    logInfo("Clearing checkpoint data for time " + time)
    this.synchronized {
      outputStreams.foreach(_.clearCheckpointData(time))
    }
    logInfo("Cleared checkpoint data for time " + time)
  }

  def restoreCheckpointData() {
    logInfo("Restoring checkpoint data")
    this.synchronized {
      outputStreams.foreach(_.restoreCheckpointData())
    }
    logInfo("Restored checkpoint data")
  }
}

DStreamCheckpointData

private[streaming]
class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
  extends Serializable with Logging {
  protected val data = new HashMap[Time, AnyRef]()

  // Mapping of the batch time to the checkpointed RDD file of that time
  @transient private var timeToCheckpointFile = new HashMap[Time, String] // 保存所有被cp的RDD的(time,cpfilename)
  // Mapping of the batch time to the time of the oldest checkpointed RDD
  // in that batch's checkpoint data
  @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time] //保存每次cp时的当前时间和其中最old RDD的时间的关系 

  @transient private var fileSystem : FileSystem = null
  protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]] // 保存此次被cp的RDD的(time,cpfilename)

  /**
   * Updates the checkpoint data of the DStream. This gets called every time
   * the graph checkpoint is initiated. Default implementation records the
   * checkpoint files to which the generate RDDs of the DStream has been saved.
   */
  def update(time: Time) {

    // Get the checkpointed RDDs from the generated RDDs
    val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined) // 从当前的dstream.generatedRDDs过滤出已经完成cp的RDDs
                                       .map(x => (x._1, x._2.getCheckpointFile.get))
    // Add the checkpoint files to the data to be serialized 
    if (!checkpointFiles.isEmpty) {
      currentCheckpointFiles.clear()
      currentCheckpointFiles ++= checkpointFiles // 更新currentCheckpointFiles 
      // Add the current checkpoint files to the map of all checkpoint files
      // This will be used to delete old checkpoint files
      timeToCheckpointFile ++= currentCheckpointFiles 
      // Remember the time of the oldest checkpoint RDD in current state
      timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) // 找出此次cp中最old的那个RDD对应的时间 
    }
  }

  /**
   * Cleanup old checkpoint data. This gets called after a checkpoint of `time` has been
   * written to the checkpoint directory.
   */
  def cleanup(time: Time) {
    // Get the time of the oldest checkpointed RDD that was written as part of the
    // checkpoint of `time`
    timeToOldestCheckpointFileTime.remove(time) match { //timeToOldestCheckpointFileTime中记录了在time时的cp中最old的rdd的时间lastCheckpointFileTime
      case Some(lastCheckpointFileTime) =>
        // Find all the checkpointed RDDs (i.e. files) that are older than `lastCheckpointFileTime`
        // This is because checkpointed RDDs older than this are not going to be needed
        // even after master fails, as the checkpoint data of `time` does not refer to those files
        val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime) //清除所有比lastCheckpointFileTime更老的cpFile
        logDebug("Files to delete:\n" + filesToDelete.mkString(","))
        filesToDelete.foreach {
          case (time, file) =>
            try {
              val path = new Path(file)
              if (fileSystem == null) {
                fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration)
              }
              fileSystem.delete(path, true)
              timeToCheckpointFile -= time
              logInfo("Deleted checkpoint file '" + file + "' for time " + time)
            } catch {
            }
        }
      case None =>
        logDebug("Nothing to delete")
    }
  }

  /**
   * Restore the checkpoint data. This gets called once when the DStream graph
   * (along with its DStreams) are being restored from a graph checkpoint file.
   * Default implementation restores the RDDs from their checkpoint files.
   */
  def restore() {
    // Create RDDs from the checkpoint data
    currentCheckpointFiles.foreach {
      case(time, file) => {
        // 恢复,即从cpFile中反序列化出RDD,并加入dstream.generatedRDDs中
        dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file)))
      }
    }
  }
}

DStream

//DStream
  // Checkpoint details
  private[streaming] val mustCheckpoint = false
  private[streaming] var checkpointDuration: Duration = null
  private[streaming] val checkpointData = new DStreamCheckpointData(this)
  
 /**
   * Enable periodic checkpointing of RDDs of this DStream
   * @param interval Time interval after which generated RDD will be checkpointed
   */
  def checkpoint(interval: Duration): DStream[T] = {
    if (isInitialized) {
      throw new UnsupportedOperationException(
        "Cannot change checkpoint interval of an DStream after streaming context has started")
    }
    persist()
    checkpointDuration = interval
    this
  }

  /**
   * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of
   * this stream. This is an internal method that should not be called directly. This is
   * a default implementation that saves only the file names of the checkpointed RDDs to
   * checkpointData. Subclasses of DStream (especially those of InputDStream) may override
   * this method to save custom checkpoint data.
   */
  private[streaming] def updateCheckpointData(currentTime: Time) {
    checkpointData.update(currentTime)
    dependencies.foreach(_.updateCheckpointData(currentTime))
  }

  private[streaming] def clearCheckpointData(time: Time) {
    checkpointData.cleanup(time)
    dependencies.foreach(_.clearCheckpointData(time))
  }

  /**
   * Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method
   * that should not be called directly. This is a default implementation that recreates RDDs
   * from the checkpoint file names stored in checkpointData. Subclasses of DStream that
   * override the updateCheckpointData() method would also need to override this method.
   */
  private[streaming] def restoreCheckpointData() {
    // Create RDDs from the checkpoint data
    checkpointData.restore()
    dependencies.foreach(_.restoreCheckpointData())
  }

JobGenerator
1. 在每次runJobs结束,即每次新提交一组jobs后,会执行对DoCheckpoint将Checkpoint对象写入文件
2. 在restart的时候,会重新run pendingTimes + downTimes的jobs,保证at-least once逻辑

//JobGenerator
  private lazy val checkpointWriter =
    if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
      new CheckpointWriter(this, ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
  } else {
    null
  }

  /** Generate jobs and perform checkpoint for the given `time`.  */
  private def generateJobs(time: Time) {
    SparkEnv.set(ssc.env)
    Try(graph.generateJobs(time)) match {
      case Success(jobs) => jobScheduler.runJobs(time, jobs)
      case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e)
    }
    eventActor ! DoCheckpoint(time)  //在完成runJobs后,对DStreamGraph进行CP
  }

  /** Perform checkpoint for the give `time`. */
  private def doCheckpoint(time: Time) = synchronized {
    if (checkpointWriter != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
      ssc.graph.updateCheckpointData(time)  //先更新graph中DStream的currentCheckpointFiles
      checkpointWriter.write(new Checkpoint(ssc, time))  //使用checkpointWriter将Checkpoint对象写入文件
    }
  }

  def onCheckpointCompletion(time: Time) {
    eventActor ! ClearCheckpointData(time) //在完成DStreamGraph的CP后,需要清除该DStream之前的RDD的CP文件
  }

  /** Clear DStream checkpoint data for the given `time`. */
  private def clearCheckpointData(time: Time) {
    ssc.graph.clearCheckpointData(time)
  }


  /** Restarts the generator based on the information in checkpoint */
  private def restart() {
    // If manual clock is being used for testing, then
    // either set the manual clock to the last checkpointed time,
    // or if the property is defined set it to that time
    if (clock.isInstanceOf[ManualClock]) {
      val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
      val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)
      clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
    }

    val batchDuration = ssc.graph.batchDuration

    // Batches when the master was down, that is,
    // between the checkpoint and current restart time
    val checkpointTime = ssc.initialCheckpoint.checkpointTime
    val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
    val downTimes = checkpointTime.until(restartTime, batchDuration)  //在最后一次checkpoint到restart之间这段时间内RDD的times
    logInfo("Batches during down time (" + downTimes.size + " batches): "
      + downTimes.mkString(", "))

    // Batches that were unprocessed before failure
    val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering) // 在CP该graph时,jobsets仍没有提交的jobset
    logInfo("Batches pending processing (" + pendingTimes.size + " batches): " +
      pendingTimes.mkString(", "))
    // Reschedule jobs for these times
    val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering) // 需要Reschedule的为pendingTimes + downTimes
    logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +
      timesToReschedule.mkString(", "))
    timesToReschedule.foreach(time =>
      jobScheduler.runJobs(time, graph.generateJobs(time))
    )

    // Restart the timer
    timer.start(restartTime.milliseconds)
    logInfo("JobGenerator restarted at " + restartTime)
  }

StreamingContext
在有checkpoint文件时,需要先读出Checkpoint对象,然后去初始化StreamingContext
从而使用Checkpoint去恢复graph中所有的DStream

//StreamingContext 
class StreamingContext private[streaming] (
    sc_ : SparkContext,
    cp_ : Checkpoint,
    batchDur_ : Duration
  ) extends Logging {

  private[streaming] val isCheckpointPresent = (cp_ != null)

  private[streaming] val graph: DStreamGraph = {
    if (isCheckpointPresent) {
      cp_.graph.setContext(this)
      cp_.graph.restoreCheckpointData()
      cp_.graph
    } else {
      assert(batchDur_ != null, "Batch duration for streaming context cannot be null")
      val newGraph = new DStreamGraph()
      newGraph.setBatchDuration(batchDur_)
      newGraph
    }
  }

  /**
   * Set the context to periodically checkpoint the DStream operations for driver
   * fault-tolerance.
   * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored.
   *                  Note that this must be a fault-tolerant file system like HDFS for
   */
  def checkpoint(directory: String) {  //仅仅是创建checkpointDir,函数名起的不好
    if (directory != null) {
      val path = new Path(directory)
      val fs = path.getFileSystem(sparkContext.hadoopConfiguration)
      fs.mkdirs(path)
      val fullPath = fs.getFileStatus(path).getPath().toString
      sc.setCheckpointDir(fullPath)
      checkpointDir = fullPath
    } else {
      checkpointDir = null
    }
  }

  private[streaming] def initialCheckpoint: Checkpoint = {
    if (isCheckpointPresent) cp_ else null
  }
}

object StreamingContext extends Logging {

  /**
   * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
   * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
   * recreated from the checkpoint data. If the data does not exist, then the StreamingContext
   * will be created by called the provided `creatingFunc`.
   *
   * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
   * @param creatingFunc   Function to create a new StreamingContext
   * @param hadoopConf     Optional Hadoop configuration if necessary for reading from the
   *                       file system
   * @param createOnError  Optional, whether to create a new StreamingContext if there is an
   *                       error in reading checkpoint data. By default, an exception will be
   *                       thrown on error.
   */
  def getOrCreate(
      checkpointPath: String,
      creatingFunc: () => StreamingContext,
      hadoopConf: Configuration = new Configuration(),
      createOnError: Boolean = false
    ): StreamingContext = {
    val checkpointOption = try { //从CPfile里面读出Checkpoint对象
      CheckpointReader.read(checkpointPath,  new SparkConf(), hadoopConf)
    } catch {
      case e: Exception =>
        if (createOnError) {
          None
        } else {
          throw e
        }
    }
    checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc()) //用Checkpoint对象去初始化StreamingContext
  }
}

Checkpoint (org.apache.spark.streaming)
Checkpoint主要是为了cp DStreamGraph对象,通过CheckpointWriter将Checkpoint序列化到文件

private[streaming]
class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
  extends Logging with Serializable {
  val master = ssc.sc.master
  val framework = ssc.sc.appName
  val sparkHome = ssc.sc.getSparkHome.getOrElse(null)
  val jars = ssc.sc.jars
  val graph = ssc.graph  //关键需要cp的graph信息
  val checkpointDir = ssc.checkpointDir
  val checkpointDuration = ssc.checkpointDuration
  //从JobScheduler的jobSets取出没有被run的jobset的time列表
  val pendingTimes = ssc.scheduler.getPendingTimes().toArray
  val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
  val sparkConfPairs = ssc.conf.getAll

}

CheckpointWriter,用于将CP对象写入文件

/**
 * Convenience class to handle the writing of graph checkpoint to file
 */
private[streaming]
class CheckpointWriter(
    jobGenerator: JobGenerator,
    conf: SparkConf,
    checkpointDir: String,
    hadoopConf: Configuration
  ) extends Logging {
  val MAX_ATTEMPTS = 3
  val executor = Executors.newFixedThreadPool(1)
  val compressionCodec = CompressionCodec.createCodec(conf)
  private var stopped = false
  private var fs_ : FileSystem = _

  class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
    def run() {
      var attempts = 0
      val startTime = System.currentTimeMillis()
      val tempFile = new Path(checkpointDir, "temp")  // 临时文件
      val checkpointFile = Checkpoint.checkpointFile(checkpointDir, checkpointTime)  // 正式的Cp文件 
      val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, checkpointTime)  // 备份文件

      while (attempts < MAX_ATTEMPTS && !stopped) {
        attempts += 1
        try {
          logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + checkpointFile
            + "'")

          // Write checkpoint to temp file,先写到临时文件
          fs.delete(tempFile, true)   // just in case it exists
          val fos = fs.create(tempFile)
          fos.write(bytes)
          fos.close()

          // If the checkpoint file exists, back it up
          // If the backup exists as well, just delete it, otherwise rename will fail
          if (fs.exists(checkpointFile)) {
            fs.delete(backupFile, true) // just in case it exists
            if (!fs.rename(checkpointFile, backupFile)) {   // 将当前的CP rename成backup文件
              logWarning("Could not rename " + checkpointFile + " to " + backupFile)
            }
          }

          // Rename temp file to the final checkpoint file,再将临时文件rename成cp文件
          if (!fs.rename(tempFile, checkpointFile)) {
            logWarning("Could not rename " + tempFile + " to " + checkpointFile)
          }

          // Delete old checkpoint files
          val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs)
          if (allCheckpointFiles.size > 4) {
            allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach(file => {
              logInfo("Deleting " + file)
              fs.delete(file, true)
            })
          }

          // All done, print success 
          val finishTime = System.currentTimeMillis()
          jobGenerator.onCheckpointCompletion(checkpointTime)  // Checkpoint完成是,触发jobGenerator.onCheckpointCompletion
          return
        } catch {
        }
      }
    }
  }

  def write(checkpoint: Checkpoint) {
    val bos = new ByteArrayOutputStream()
    val zos = compressionCodec.compressedOutputStream(bos)
    val oos = new ObjectOutputStream(zos)
    oos.writeObject(checkpoint)  // 将Checkpoint对象序列化
    oos.close()
    bos.close()
    try {
      executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray)) // 用线程去执行CheckpointWriteHandler将数据写入文件
    } catch {
    }
  }
}

CheckpointReader

private[streaming]
object CheckpointReader extends Logging {

  def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] =
  {
    val checkpointPath = new Path(checkpointDir)
    def fs = checkpointPath.getFileSystem(hadoopConf)
    
    // Try to find the checkpoint files 
    val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs).reverse
    if (checkpointFiles.isEmpty) {
      return None
    }

    // Try to read the checkpoint files in the order  
    logInfo("Checkpoint files found: " + checkpointFiles.mkString(","))
    val compressionCodec = CompressionCodec.createCodec(conf)
    checkpointFiles.foreach(file => {
      logInfo("Attempting to load checkpoint from file " + file)
      try {
        val fis = fs.open(file)
        // ObjectInputStream uses the last defined user-defined class loader in the stack
        // to find classes, which maybe the wrong class loader. Hence, a inherited version
        // of ObjectInputStream is used to explicitly use the current thread's default class
        // loader to find and load classes. This is a well know Java issue and has popped up
        // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
        val zis = compressionCodec.compressedInputStream(fis)
        val ois = new ObjectInputStreamWithLoader(zis,
          Thread.currentThread().getContextClassLoader)
        val cp = ois.readObject.asInstanceOf[Checkpoint]  // 将文件内容反序列化成Checkpoint对象 
        ois.close()
        fs.close()
        cp.validate()
        return Some(cp)
      } catch {
      }
    })

  }
}


 

 

相关文章: