【问题标题】:Spark RDD checkpoint on persisted/cached RDDs are performing the DAG twice持久化/缓存 RDD 上的 Spark RDD 检查点执行 DAG 两次
【发布时间】:2015-09-13 17:06:33
【问题描述】:

当我运行如下代码时:

val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
newRDD.checkpoint
print(newRDD.count())

并观察 Yarn 中的各个阶段,我注意到 Spark 正在执行 DAG 计算 TWICE - 一次用于实现 RDD 并缓存它的 distinct+count,然后完全是第二次创建检查点副本。

既然 RDD 已经物化和缓存了,为什么检查点不简单地利用这一点,将缓存的分区保存到磁盘?

是否有现有的方法(某种配置设置或代码更改)来强制 Spark 利用这一点并且只运行一次操作,而检查点只会复制内容?

我是否需要“物化”两次?

val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
print(newRDD.count())

newRDD.checkpoint
print(newRDD.count())

我创建了一个 Apache Spark Jira 票证来提出这个功能请求: https://issues.apache.org/jira/browse/SPARK-8666

【问题讨论】:

    标签: caching apache-spark rdd persist checkpoint


    【解决方案1】:

    看起来这可能是一个已知问题。查看旧的 JIRA 票证,https://issues.apache.org/jira/browse/SPARK-8582

    【讨论】:

      【解决方案2】:

      这是一个老问题。但它也影响了我,所以我做了一些挖掘。我在 jira 和 github 的变更跟踪历史中发现了一堆非常无用的搜索结果。这些搜索结果包含大量来自开发人员关于他们提议的编程更改的技术喋喋不休。这对我来说并没有提供非常丰富的信息,我建议限制您查看它的时间。

      我能找到的关于这件事的最清楚的信息在这里: https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md

      需要检查点的 RDD 将被计算两次;因此建议在 rdd.checkpoint() 之前做一个 rdd.cache()

      鉴于 OP 实际上确实使用了持久性和检查点,他可能走在正确的轨道上。我怀疑唯一的问题是他调用检查点的方式。我对火花还很陌生,但我认为他应该这样做:

      newRDD = newRDD.checkpoint

      希望这很清楚。根据我的测试,这消除了我的一个数据帧的冗余重新计算。

      【讨论】:

        【解决方案3】:

        你缓存的数据可能会因为内存不足而被驱逐,你可以打开Spark UI查看是否为真。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2016-07-17
          • 2016-01-16
          • 2019-07-08
          • 2014-08-25
          • 1970-01-01
          • 2019-01-08
          • 1970-01-01
          • 2016-02-24
          相关资源
          最近更新 更多