【问题标题】:Apache Spark Method returning an RDD (with Tail Recursion)返回 RDD 的 Apache Spark 方法(使用尾递归)
【发布时间】:2015-06-16 10:24:39
【问题描述】:

一个 RDD 有一个沿袭,因此在对其执行操作之前不存在;所以,如果我有一个对 RDD 执行大量转换并返回转换后的 RDD 的方法,那么我实际上返回的是什么? 在某项操作需要该 RDD 之前,我是否不返回任何内容?如果我在方法中缓存了一个 RDD,它会保留在缓存中吗?我想我知道这个问题的答案:只有在返回的 RDD 上调用操作时才会运行该方法?但我可能是错的。

这个问题的扩展是: 如果我有一个尾递归方法,它以 RDD 作为参数并返回一个 RDD,但我在方法中缓存 RDD:

def method(myRDD : RDD) : RDD = {
   ...
   anRDD.cache
   if(true) return someRDD
   method(someRDD) // tailrec
}

然后,当尾递归发生时,它会覆盖先前缓存的 RDD anRDD 还是两者都保留?我想两者都会坚持下去。当我使用的数据集只有 63mb 大时,我将数据溢出到磁盘。而且我认为这可能与尾递归方法有关。

【问题讨论】:

    标签: caching recursion apache-spark rdd


    【解决方案1】:

    RDD 沿袭构建为 RDD 对象实例链接在一起的图,其中沿袭中的每个节点都引用其依赖项。在它最简单的链形式中,您可以将其视为链表:

    hadoopRDD(location) <-depends- filteredRDD(f:A->Boolean) <-depends- mappedRDD(f:A->B)
    

    您可以在基本 RDD 构造函数中欣赏这一点:

    /** Construct an RDD with just a one-to-one dependency on one parent */
      def this(@transient oneParent: RDD[_]) =
        this(oneParent.context , List(new OneToOneDependency(oneParent))) 
    

    言归正传:就像我们可以递归地构建一个链表一样,我们也可以构建一个 RDD 沿袭。作用于 RDD 的递归函数的结果将是一个定义良好的 RDD。

    将需要一个操作来安排该沿袭的执行,并将实现由它表示的计算,就像创建后可以“遍历”一个链表一样。

    考虑这个(我必须承认,相当做作)的例子:

    def isPrime(n:Int):Boolean = {
        (n == 2) || (!( n % 2 ==0) && !((3 to math.sqrt(n).ceil.toInt) exists (x => n % x == 0)))
    }
    
    def recPrimeFilter(rdd:RDD[Int], i:Int):RDD[Int] = 
    if (i<=1) rdd else if (isPrime(i)) recPrimeFilter(rdd.filter(x=> x!=i), i-1) else (recPrimeFilter(rdd.map(x=>x+i), i-1))
    

    当应用于整数的 RDD 时,我们可以使用交错过滤器观察谱系并映射素数位置的结果:

    val rdd = sc.parallelize(1 to 100)
    val res = weirdPrimeFilter(rdd,15)
    scala> res.toDebugString
    res3: String = 
    (8) FilteredRDD[54] at filter at <console>:18 []
     |  FilteredRDD[53] at filter at <console>:18 []
     |  MappedRDD[52] at map at <console>:18 []
     |  FilteredRDD[51] at filter at <console>:18 []
     |  MappedRDD[50] at map at <console>:18 []
     |  FilteredRDD[49] at filter at <console>:18 []
     |  MappedRDD[48] at map at <console>:18 []
     |  MappedRDD[47] at map at <console>:18 []
     |  MappedRDD[46] at map at <console>:18 []
     |  FilteredRDD[45] at filter at <console>:18 []
     |  MappedRDD[44] at map at <console>:18 []
     |  FilteredRDD[43] at filter at <console>:18 []
     |  MappedRDD[42] at map at <console>:18 []
     |  MappedRDD[41] at map at <console>:18 []
     |  ParallelCollectionRDD[33] at parallelize at <console>:13 []
    

    'cache' 打破了血统,使 RDD 在缓存点第一次经过时“记住”其内容,以便血统中更靠前的所有相关 RDD 可以重用该缓存数据。 在线性 RDD 沿袭的基本情况下,它根本没有任何影响,因为每个节点只会被访问一次。

    在这种情况下,如果递归 RDD 构造过程创建了一个图或树状结构,其中在许多不同的“叶”节点处调用操作,那么缓存可能是有意义的。

    【讨论】:

    • 谢谢你,非常有用的答案 - 我可能需要重新阅读它几次才能理解它。 rdd.unpersist的作用是什么?即当我有一个基于缓存 RDD 的 RDD 并且缓存的 RDD 未缓存时,会发生什么?以前的血统会重新出现/出现吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-06-08
    • 2014-05-13
    • 1970-01-01
    • 2016-08-03
    • 1970-01-01
    • 2014-01-24
    • 2015-07-05
    相关资源
    最近更新 更多