【问题标题】:RDD lineage cacheRDD 沿袭缓存
【发布时间】:2016-01-25 22:14:19
【问题描述】:

如果是 RDD,我无法理解血统。比如

假设我们有这个血统:

hadoopRDD(location) <-depends- filteredRDD(f:A->Boolean) <-depends- mappedRDD(f:A->B)

如果我们持久化第一个 RDD 并在某些操作之后取消持久化它。这会影响其他依赖的RDD吗?如果是的话,怎么能避免呢?

我的意思是,如果我们取消保留父 RDD,此操作会从子 RDD 中删除分区吗?

【问题讨论】:

    标签: apache-spark rdd


    【解决方案1】:

    让我们来看一个例子。这将在一个分区中创建一个带有 Ints 序列的 RDD。一个分区的原因只是为了保持示例其余部分的顺序。

    scala> val seq = Seq(1,2,3,4,5)
    seq: Seq[Int] = List(1, 2, 3, 4, 5)
    
    scala> val rdd = sc.parallelize(seq, 1)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:23
    

    现在让我们创建两个新的 RDD,它们是原始 RDD 的映射版本:

    scala> val firstMappedRDD = rdd.map { case i => println(s"firstMappedRDD  calc for $i"); i * 2 }
    firstMappedRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[12] at map at <console>:25
    
    scala> firstMappedRDD.toDebugString
    res25: String = 
    (1) MapPartitionsRDD[12] at map at <console>:25 []
     |  ParallelCollectionRDD[11] at parallelize at <console>:23 []
    
    scala> val secondMappedRDD = firstMappedRDD.map { case i => println(s"secondMappedRDD calc for $i"); i * 2 }
    secondMappedRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at map at <console>:27
    
    scala> secondMappedRDD.toDebugString
    res26: String = 
    (1) MapPartitionsRDD[13] at map at <console>:27 []
     |  MapPartitionsRDD[12] at map at <console>:25 []
     |  ParallelCollectionRDD[11] at parallelize at <console>:23 []
    

    我们可以使用toDebugString 查看血统。我在每个地图步骤中添加了printlns,以便在调用map 时清楚。让我们收集每个 RDD 看看会发生什么:

    scala> firstMappedRDD.collect()
    firstMappedRDD  calc for 1
    firstMappedRDD  calc for 2
    firstMappedRDD  calc for 3
    firstMappedRDD  calc for 4
    firstMappedRDD  calc for 5
    res27: Array[Int] = Array(2, 4, 6, 8, 10)
    
    scala> secondMappedRDD.collect()
    firstMappedRDD  calc for 1
    secondMappedRDD calc for 2
    firstMappedRDD  calc for 2
    secondMappedRDD calc for 4
    firstMappedRDD  calc for 3
    secondMappedRDD calc for 6
    firstMappedRDD  calc for 4
    secondMappedRDD calc for 8
    firstMappedRDD  calc for 5
    secondMappedRDD calc for 10
    res28: Array[Int] = Array(4, 8, 12, 16, 20)
    

    如您所料,当我们调用secondMappedRDD.collect() 时,会再次调用第一步的映射。所以现在让我们cache 第一个映射的 RDD。

    scala> firstMappedRDD.cache()
    res29: firstMappedRDD.type = MapPartitionsRDD[12] at map at <console>:25
    
    scala> secondMappedRDD.toDebugString
    res31: String = 
    (1) MapPartitionsRDD[13] at map at <console>:27 []
     |  MapPartitionsRDD[12] at map at <console>:25 []
     |  ParallelCollectionRDD[11] at parallelize at <console>:23 []
    
    scala> firstMappedRDD.count()
    firstMappedRDD  calc for 1
    firstMappedRDD  calc for 2
    firstMappedRDD  calc for 3
    firstMappedRDD  calc for 4
    firstMappedRDD  calc for 5
    res32: Long = 5
    
    scala> secondMappedRDD.toDebugString
    res33: String = 
    (1) MapPartitionsRDD[13] at map at <console>:27 []
     |  MapPartitionsRDD[12] at map at <console>:25 []
     |      CachedPartitions: 1; MemorySize: 120.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
     |  ParallelCollectionRDD[11] at parallelize at <console>:23 []
    

    第二个映射RDD的血统在它的血统中具有第一个的缓存结果,在第一个映射的结果在缓存中之后。现在让我们拨打collect

    scala> secondMappedRDD.collect
    secondMappedRDD calc for 2
    secondMappedRDD calc for 4
    secondMappedRDD calc for 6
    secondMappedRDD calc for 8
    secondMappedRDD calc for 10
    res34: Array[Int] = Array(4, 8, 12, 16, 20)
    

    现在让我们unpersist 并再次致电collect

    scala> firstMappedRDD.unpersist()
    res36: firstMappedRDD.type = MapPartitionsRDD[12] at map at <console>:25
    
    scala> secondMappedRDD.toDebugString
    res37: String = 
    (1) MapPartitionsRDD[13] at map at <console>:27 []
     |  MapPartitionsRDD[12] at map at <console>:25 []
     |  ParallelCollectionRDD[11] at parallelize at <console>:23 []
    
    scala> secondMappedRDD.collect
    firstMappedRDD  calc for 1
    secondMappedRDD calc for 2
    firstMappedRDD  calc for 2
    secondMappedRDD calc for 4
    firstMappedRDD  calc for 3
    secondMappedRDD calc for 6
    firstMappedRDD  calc for 4
    secondMappedRDD calc for 8
    firstMappedRDD  calc for 5
    secondMappedRDD calc for 10
    res38: Array[Int] = Array(4, 8, 12, 16, 20)
    

    所以当我们collect第一个映射RDD的结果被取消持久化后,第一个映射被再次调用。

    如果源是 HDFS 或任何其他存储,则会再次从源中检索数据。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多