【发布时间】:2016-12-05 07:54:37
【问题描述】:
我正在尝试在 map() 中使用 filter(),但我得到了这个 spark 异常:
RDD 转换和动作只能由驱动程序调用,不能在其他转换内部调用;例如,rdd1.map(x => rdd2.values.count() * x) 无效,因为值转换和计数操作无法在 rdd1.map 转换内部执行。有关详细信息,请参阅 SPARK-5063。
我知道 spark 不允许嵌套的转换/动作/RDD,所以任何人都可以给我一个建议如何做(没有嵌套的转换或动作),我有一个 RDD,它的元组就像:
JavaRDD< String[]> RDD
我尝试映射它,给它一个列表作为参数,这个列表包含 javaPairRDDs 这样的:
List<JavaPairRDD<String,String>> list
JavaRDD< String[]> result = RDD.map(new modifyRDD(list));
这些行是指 modifyRDD() 函数:
public static class modifyRDD implements Function <String[], String[]> {
List<JavaPairRDD<String,String>> list;
public modifyRDD (List<JavaPairRDD<String,String>> list ){ this.list=list;}
public String [] call(String[] t) {
String[] s = t;
for (int i = 0; i < NB_TD; i++) {
int j=i;
// select the appropriate RDD from the RDDs_list to the current index
JavaPairRDD<String,String> rdd_i = list.get(i);
String previousElement=s[j];
JavaPairRDD<String,String> currentRDD = rdd_i.filter(line -> line._1().equals(previousElement));
String newElement=currentRDD.first()._2();
s[j]=newElement;
}
return (s) ;
}
}
所以,问题出在这一行
JavaPairRDD<String,String> currentRDD = rdd_i.filter(line -> line._1().equals(previousElement));
现在我举个例子,假设列表包含 2 个 PairRDDs
list={PairRDD1={(a,b)(c,d)},PairRDD2={(u,v)(x,y)}..}
我要映射的 RDD 包含:
JavaRDD< String[]> RDD = {[a,u],[c,x],[a,x].....}
我想要的结果在 map() 之后:
JavaRDD< String[]> result = {[b,v],[d,y],[b,y].....}
【问题讨论】:
-
您是否阅读过错误信息和相应的JIRA? Spark 不支持嵌套的操作和转换,就像它不支持嵌套的 RDD。
-
是的,我做了,但我能做些什么作为替代解决方案
-
这个问题在 SO 上已经讨论了很多次了... 简短回答:a) RDD 很小,收集并使用局部变量或广播 b) RDD 很大,将这个问题表达为 join
-
在我的情况下它很大,请你给我可以遵循的步骤
-
@zero323 如果你注意到我在 map 函数中使用了 pairRDDS 列表,bcz 每个索引我都需要不同的 pairRDD,那么我如何才能将我的 RDD 与所有这些 pairRDDS 连接起来(如果您看到例如,我有 2 个 pairRDDS,在我的情况下,我不知道列表中有多少 pairRDD)
标签: java scala apache-spark key-value rdd