如错误所示,Spark 不支持嵌套 RDD。通常你必须通过重新设计你的算法来绕过它。
如何做到这一点取决于实际用例、function 中究竟发生了什么以及它的输出。
有时RDD1.cartesian(RDD2),对每个元组进行操作,然后通过键减少将起作用。有时,如果你有 (K,V) 输入两个 RDD 之间的连接将起作用。
如果 RDD2 很小,您始终可以在驱动程序中收集它,将其设为广播变量并在 function 中使用该变量而不是 RDD2。
@编辑:
例如,假设您的 RDD 保存字符串,function 将计算来自RDD 的给定记录在RDD2 中出现的次数:
def function(line: String, rdd: RDD[String]): (String, Int) = {
(line, rdd.filter(_ == line).count)
}
这将返回一个RDD[(String, Int)]。
想法1
您可以使用 RDD 的 cartesian 方法尝试使用 cartesian product。
val cartesianProduct = RDD1.cartesian(RDD2) // creates RDD[(String, String)]
.map( (r1,r2) => (r1, function2) ) // creates RDD[(String, Int)]
.reduceByKey( (c1,c2) => c1 + c2 ) // final RDD[(String, Int)]
这里function2 接受r1 和r2(它们是字符串),如果它们相等则返回1,否则返回0。最终的映射将产生一个RDD,其中包含元组,其中键是来自r1 的记录,值是总计数。
问题 1:如果您在 RDD1 中有重复的字符串,这将不起作用。你得好好想想。如果RDD1 记录有一些独特的ID,那就完美了。
问题 2:这确实会创建很多对(对于两个 RDD 中的 100 万条记录,它会创建大约 5000 亿对),会很慢并且很可能会导致很多 shuffling。
创意2
我不明白您对 RDD2 大小 lacs 的评论,所以这可能有效,也可能无效:
val rdd2array = sc.broadcast(RDD2.collect())
val result = RDD1.map(line => function(line, rdd2array))
问题:这可能会破坏您的记忆。在driver 上调用collect(),来自rdd2 的all 记录将被加载到驱动程序节点的内存中。
想法3
根据用例的不同,还有其他方法可以克服这个问题,例如brute force algorithm for Similarity Search 与您的用例相似(双关语不是有意的)。对此的替代解决方案之一是Locality Sensitive Hashing。