【问题标题】:sorting RDD elements对 RDD 元素进行排序
【发布时间】:2015-08-23 16:38:13
【问题描述】:

对于一个研究项目,我尝试对 RDD 中的元素进行排序。我用两种不同的方法做到了这一点。

在第一种方法中,我在 RDD 上应用了一个 mapPartitions() 函数,以便它对 RDD 的内容进行排序,并提供一个包含排序列表作为 RDD 中唯一记录的结果 RDD。然后,我应用了一个 reduce 函数,它基本上合并了排序列表。

我在一个包含 30 个节点的 EC2 集群上运行了这些实验。我使用 spark ec2 脚本设置它。数据文件存储在 HDFS 中。

在第二种方法中,我使用了 Spark 中的 sortBy 方法。

我对@9​​87654321@找到的美国人口普查数据(100MB)进行了这些操作

单行看起来像这样

9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, Not in universe or children, Not in universe, White, All other, Female, Not in universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler, Not in universe, Not in universe, Child <18 never marr not in subfamily, Child under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not in universe, 0, Both parents present, United-States, United-States, United-States, Native- Born in the United States, 0, Not in universe, 0, 0, 94, - 50000.

我根据 CSV 中的第 25 个值进行排序。在这一行中,即 1758.14。

我注意到 sortBy 的性能比其他方法差。这是预期的情况吗?如果是,为什么 mapPartitions() 和 reduce() 不是默认的排序方法?

这是我的实现

public static void sortBy(JavaSparkContext sc){
        JavaRDD<String> rdd = sc.textFile("/data.txt",32);
        long start = System.currentTimeMillis();
        rdd.sortBy(new Function<String, Double>(){

            @Override
                public Double call(String v1) throws Exception {
                      // TODO Auto-generated method stub
                  String [] arr = v1.split(",");
                  return Double.parseDouble(arr[24]);   
                }
        }, true, 9).collect();
        long end = System.currentTimeMillis();
        System.out.println("SortBy: " + (end - start));
  }

public static void sortList(JavaSparkContext sc){
        JavaRDD<String> rdd = sc.textFile("/data.txt",32); //parallelize(l, 8);
        long start = System.currentTimeMillis();
        JavaRDD<LinkedList<Tuple2<Double, String>>> rdd3 = rdd.mapPartitions(new FlatMapFunction<Iterator<String>, LinkedList<Tuple2<Double, String>>>(){

        @Override
        public Iterable<LinkedList<Tuple2<Double, String>>> call(Iterator<String> t)
            throws Exception {
          // TODO Auto-generated method stub
          LinkedList<Tuple2<Double, String>> lines = new LinkedList<Tuple2<Double, String>>();
          while(t.hasNext()){       
            String s = t.next();
            String arr1[] = s.split(",");
            Tuple2<Double, String> t1 = new Tuple2<Double, String>(Double.parseDouble(arr1[24]),s);
            lines.add(t1);
          }
          Collections.sort(lines, new IncomeComparator());
          LinkedList<LinkedList<Tuple2<Double, String>>> list = new LinkedList<LinkedList<Tuple2<Double, String>>>();
          list.add(lines);
          return list;
        }

        });
        rdd3.reduce(new Function2<LinkedList<Tuple2<Double, String>>, LinkedList<Tuple2<Double, String>>, LinkedList<Tuple2<Double, String>>>(){

        @Override
        public LinkedList<Tuple2<Double, String>> call(
                LinkedList<Tuple2<Double, String>> a,
                LinkedList<Tuple2<Double, String>> b) throws Exception {
          // TODO Auto-generated method stub
          LinkedList<Tuple2<Double, String>> result = new LinkedList<Tuple2<Double, String>>();
          while (a.size() > 0 && b.size() > 0) {

            if (a.getFirst()._1.compareTo(b.getFirst()._1) <= 0)
              result.add(a.poll());
            else
              result.add(b.poll());
          }

          while (a.size() > 0)
            result.add(a.poll());

          while (b.size() > 0)
            result.add(b.poll());

          return result;

        }

        });     
        long end = System.currentTimeMillis();
        System.out.println("MapPartitions: " + (end - start));
  }

【问题讨论】:

  • 这对于邮件列表来说可能是一个更好的问题。

标签: sorting apache-spark rdd


【解决方案1】:

Collect() 是一个主要瓶颈,因为它将所有结果返回给驱动程序。
它为单个源(在本例中为驱动程序)产生 IO 命中和额外的网络流量。
它还阻止其他操作。

在您的第一个 sortBy() 代码段中,而不是 collect(), 尝试执行并行操作,例如 saveAsTextFile(tmp),而不是使用 sc.textFile(tmp) 回读。

另一个 sortBy() 代码段同时使用 mapPartitions() 和 reduce() 并行 API - 因此整个工作是并行完成的。
这似乎是端到端性能时间差异的原因。

请注意,您的发现并不一定意味着所有机器的执行时间总和更差。

【讨论】:

    猜你喜欢
    • 2016-02-19
    • 2021-07-13
    • 2021-06-04
    • 2015-08-23
    • 1970-01-01
    • 1970-01-01
    • 2016-04-13
    • 2012-01-27
    • 1970-01-01
    相关资源
    最近更新 更多