【问题标题】:Can SparkContext.setCheckpointDir(hdfsPath) set same hdfsPath in different spark apps?SparkContext.setCheckpointDir(hdfsPath) 可以在不同的 Spark 应用程序中设置相同的 hdfsPath 吗?
【发布时间】:2020-07-08 10:51:39
【问题描述】:

作为文档:

https://spark.apache.org/docs/2.2.1/api/java/org/apache/spark/SparkContext.html#setCheckpointDir-java.lang.String-

SparkContext:

setCheckpointDir
public void setCheckpointDir(String directory)
Set the directory under which RDDs are going to be checkpointed.
Parameters:
directory - path to the directory where checkpoint files will be stored (must be HDFS path if running in cluster)

问题: 1)如果不同的spark应用SparkContext.setCheckpointDir(hdfsPath)设置相同的hdfsPath,会不会有冲突?

2) 如果没有冲突,CheckpointDir 的 hdfsPath 会自动清理吗?

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    问题:

    1)如果不同的spark应用SparkContext.setCheckpointDir(hdfsPath)设置相同的hdfsPath,会不会有冲突?

    答案:按照下面给出的例子没有冲突。多个应用程序可以使用相同的检查点目录。将在该唯一哈希类型的文件夹下创建以避免冲突。


    2) 如果没有冲突,CheckpointDir 的 hdfsPath 会自动清理吗?

    答案:是的它正在发生。对于下面的示例,我使用local 进行演示......但localhdfs 没关系。行为将是相同的。


    举个例子(用同一个检查点目录运行多次):

    package examples
    
    import java.io.File
    
    import org.apache.log4j.Level
    
    
    object CheckPointTest extends App {
      import org.apache.spark.sql.{Dataset, SparkSession}
      val spark = SparkSession.builder().appName("CheckPointTest").master("local").getOrCreate()
      val logger = org.apache.log4j.Logger.getLogger("org")
      logger.setLevel(Level.WARN)
      import spark.implicits._
    
      spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
    
    
      val csvData1: Dataset[String] = spark.sparkContext.parallelize(
        """
          |id
          | a
          | b
          | c
    
        """.stripMargin.lines.toList).toDS()
      val frame1 = spark.read.option("header", true).option("inferSchema",true).csv(csvData1).show
    
      val checkpointDir = spark.sparkContext.getCheckpointDir.get
      println(checkpointDir)
    
    println("Number of Files in Check Point Directory " + getListOfFiles(checkpointDir).length)
    
    
      def getListOfFiles(dir: String):List[File] = {
        val d = new File(dir)
        if (d.exists && d.isDirectory) {
          d.listFiles.filter(_.isFile).toList
        } else {
          List[File]()
        }
      }
    }
    
    

    结果:

    +---+
    | id|
    +---+
    |  a|
    |  b|
    |  c|
    +---+
    
    file:/tmp/checkpoints/30e6f882-b49a-42cc-9e60-59adecf13166
    Number of Files in Check Point Directory 0 // this indicates once application finished removed all the RDD/DS information.
    
    

    如果你看一下检查点文件夹,它会是这样的......

    
    user@f0189843ecbe [~/Downloads]$ ll /tmp/checkpoints/
    total 0
    drwxr-xr-x  2 user  wheel   64 Mar 27 14:08 a2396c08-14b6-418a-b183-a90a4ca7dba3
    drwxr-xr-x  2 user  wheel   64 Mar 27 14:09 65c8ef5a-0e64-4e79-a050-7d1ee1d0e03d
    drwxr-xr-x  2 user  wheel   64 Mar 27 14:09 5667758c-180f-4c0b-8b3c-912afca59f55
    drwxr-xr-x  2 user  wheel   64 Mar 27 14:10 30e6f882-b49a-42cc-9e60-59adecf13166
    drwxr-xr-x  6 user  wheel  192 Mar 27 14:10 .
    drwxrwxrwt  5 root    wheel  160 Mar 27 14:10 ..
    user@f0189843ecbe [~/Downloads]$ du -h /tmp/checkpoints/
      0B    /tmp/checkpoints//a2396c08-14b6-418a-b183-a90a4ca7dba3
      0B    /tmp/checkpoints//5667758c-180f-4c0b-8b3c-912afca59f55
      0B    /tmp/checkpoints//65c8ef5a-0e64-4e79-a050-7d1ee1d0e03d
      0B    /tmp/checkpoints//30e6f882-b49a-42cc-9e60-59adecf13166
      0B    /tmp/checkpoints/
    

    结论:

    1) 即使多个应用程序并行运行,检查点目录下也会有唯一的哈希,所有的RDD/DS 信息将被存储。

    2) 成功后完全执行每个 Spark 应用程序,上下文清理器将删除中的内容 它..是我从上面的实际例子中观察到的。

    【讨论】:

    • 非常感谢~您推荐的学习此类机制的最佳方法如何?阅读火花代码?好像直接spark文档不是很清楚。
    • 从 github 读取 spark 代码是一种高级方法。但我建议您阅读多本书,您将对这些概念和技术有更好的理解。然后您可以尝试阅读和理解 github spark 代码以了解其工作原理的内部机制。它会打开思路。
    猜你喜欢
    • 2023-04-10
    • 2012-06-13
    • 2013-06-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多