【问题标题】:spark exception when using filter() within map()在 map() 中使用 filter() 时引发异常
【发布时间】:2016-12-05 07:54:37
【问题描述】:

我正在尝试在 map() 中使用 filter(),但我得到了这个 spark 异常:

RDD 转换和动作只能由驱动程序调用,不能在其他转换内部调用;例如,rdd1.map(x => rdd2.values.count() * x) 无效,因为值转换和计数操作无法在 rdd1.map 转换内部执行。有关详细信息,请参阅 SPARK-5063。

我知道 spark 不允许嵌套的转换/动作/RDD,所以任何人都可以给我一个建议如何做(没有嵌套的转换或动作),我有一个 RDD,它的元组就像:

 JavaRDD< String[]> RDD

我尝试映射它,给它一个列表作为参数,这个列表包含 javaPairRDDs 这样的:

List<JavaPairRDD<String,String>> list
JavaRDD< String[]> result = RDD.map(new modifyRDD(list));

这些行是指 modifyRDD() 函数:

public static class modifyRDD implements Function <String[], String[]> { 

    List<JavaPairRDD<String,String>> list;
    public modifyRDD (List<JavaPairRDD<String,String>> list ){ this.list=list;}

    public String [] call(String[] t) {

          String[] s = t;

          for (int i = 0; i < NB_TD; i++) {         
            int j=i; 
         // select the appropriate RDD from the RDDs_list to the current index 

            JavaPairRDD<String,String> rdd_i = list.get(i);
            String previousElement=s[j];

           JavaPairRDD<String,String> currentRDD =  rdd_i.filter(line -> line._1().equals(previousElement));

           String newElement=currentRDD.first()._2();   

           s[j]=newElement;
                }

          return   (s) ;

    }


    }

所以,问题出在这一行

  JavaPairRDD<String,String> currentRDD =  rdd_i.filter(line -> line._1().equals(previousElement));

现在我举个例子,假设列表包含 2 个 PairRDDs

list={PairRDD1={(a,b)(c,d)},PairRDD2={(u,v)(x,y)}..}

我要映射的 RDD 包含:

 JavaRDD< String[]> RDD = {[a,u],[c,x],[a,x].....}

我想要的结果在 map() 之后:

 JavaRDD< String[]> result = {[b,v],[d,y],[b,y].....}

【问题讨论】:

  • 您是否阅读过错误信息和相应的JIRA? Spark 不支持嵌套的操作和转换,就像它不支持嵌套的 RDD。
  • 是的,我做了,但我能做些什么作为替代解决方案
  • 这个问题在 SO 上已经讨论了很多次了... 简短回答:a) RDD 很小,收集并使用局部变量或广播 b) RDD 很大,将这个问题表达为 join
  • 在我的情况下它很大,请你给我可以遵循的步骤
  • @zero323 如果你注意到我在 map 函数中使用了 pairRDDS 列表,bcz 每个索引我都需要不同的 pairRDD,那么我如何才能将我的 RDD 与所有这些 pairRDDS 连接起来(如果您看到例如,我有 2 个 pairRDDS,在我的情况下,我不知道列表中有多少 pairRDD)

标签: java scala apache-spark key-value rdd


【解决方案1】:

我将列表的类型从 List> 更改为 List>> list 以避免处理 map() 中的 RDD,现在我没有例外(当然 bcz 我没有嵌套转换),但我不确定关于它是如果我的新代码是有效的,bcz List> 很大,并且为了搜索一个元素,我使用了一个循环“for”(意味着我必须扫描整个 List> 以获得我想要的元素)所以我问你作为专家给我评论(使用循环),并提出改善它的建议。谢谢

这是修改后的map()函数

  public static class modifyRDD implements Function <String[], String[]> { 

    List<List<Tuple2<String,String>>> list;
    public modifyRDD (List<List<Tuple2<String,String>>> list ){ this.list=list;}

    public String [] call(String[] t) {

          String[] s = t;

          for (int i = 0; i < NB_TD; i++) {         

         // select the appropriate lookup_list 

            List<Tuple2<String,String>> list_i = list.get(i);
            String previousElement=s[i];
            String newElement="";

            for (int k = 0; k < list_i.size(); k++){

            Tuple2<String,String> sk1 = list_i.get(k);
            if (sk1._1.equals( previousElement)){  newElement=sk1._2;}

            }


           s[i]= newElement;
                }
         return(s);
                                   }

【讨论】:

    猜你喜欢
    • 2014-11-07
    • 1970-01-01
    • 2019-01-29
    • 1970-01-01
    • 1970-01-01
    • 2016-11-21
    • 2017-10-03
    • 2017-11-25
    • 1970-01-01
    相关资源
    最近更新 更多