RDD 沿袭构建为 RDD 对象实例链接在一起的图,其中沿袭中的每个节点都引用其依赖项。在它最简单的链形式中,您可以将其视为链表:
hadoopRDD(location) <-depends- filteredRDD(f:A->Boolean) <-depends- mappedRDD(f:A->B)
您可以在基本 RDD 构造函数中欣赏这一点:
/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) =
this(oneParent.context , List(new OneToOneDependency(oneParent)))
言归正传:就像我们可以递归地构建一个链表一样,我们也可以构建一个 RDD 沿袭。作用于 RDD 的递归函数的结果将是一个定义良好的 RDD。
将需要一个操作来安排该沿袭的执行,并将实现由它表示的计算,就像创建后可以“遍历”一个链表一样。
考虑这个(我必须承认,相当做作)的例子:
def isPrime(n:Int):Boolean = {
(n == 2) || (!( n % 2 ==0) && !((3 to math.sqrt(n).ceil.toInt) exists (x => n % x == 0)))
}
def recPrimeFilter(rdd:RDD[Int], i:Int):RDD[Int] =
if (i<=1) rdd else if (isPrime(i)) recPrimeFilter(rdd.filter(x=> x!=i), i-1) else (recPrimeFilter(rdd.map(x=>x+i), i-1))
当应用于整数的 RDD 时,我们可以使用交错过滤器观察谱系并映射素数位置的结果:
val rdd = sc.parallelize(1 to 100)
val res = weirdPrimeFilter(rdd,15)
scala> res.toDebugString
res3: String =
(8) FilteredRDD[54] at filter at <console>:18 []
| FilteredRDD[53] at filter at <console>:18 []
| MappedRDD[52] at map at <console>:18 []
| FilteredRDD[51] at filter at <console>:18 []
| MappedRDD[50] at map at <console>:18 []
| FilteredRDD[49] at filter at <console>:18 []
| MappedRDD[48] at map at <console>:18 []
| MappedRDD[47] at map at <console>:18 []
| MappedRDD[46] at map at <console>:18 []
| FilteredRDD[45] at filter at <console>:18 []
| MappedRDD[44] at map at <console>:18 []
| FilteredRDD[43] at filter at <console>:18 []
| MappedRDD[42] at map at <console>:18 []
| MappedRDD[41] at map at <console>:18 []
| ParallelCollectionRDD[33] at parallelize at <console>:13 []
'cache' 打破了血统,使 RDD 在缓存点第一次经过时“记住”其内容,以便血统中更靠前的所有相关 RDD 可以重用该缓存数据。
在线性 RDD 沿袭的基本情况下,它根本没有任何影响,因为每个节点只会被访问一次。
在这种情况下,如果递归 RDD 构造过程创建了一个图或树状结构,其中在许多不同的“叶”节点处调用操作,那么缓存可能是有意义的。