【发布时间】:2015-08-23 16:38:13
【问题描述】:
对于一个研究项目,我尝试对 RDD 中的元素进行排序。我用两种不同的方法做到了这一点。
在第一种方法中,我在 RDD 上应用了一个 mapPartitions() 函数,以便它对 RDD 的内容进行排序,并提供一个包含排序列表作为 RDD 中唯一记录的结果 RDD。然后,我应用了一个 reduce 函数,它基本上合并了排序列表。
我在一个包含 30 个节点的 EC2 集群上运行了这些实验。我使用 spark ec2 脚本设置它。数据文件存储在 HDFS 中。
在第二种方法中,我使用了 Spark 中的 sortBy 方法。
我对@987654321@找到的美国人口普查数据(100MB)进行了这些操作
单行看起来像这样
9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, Not in universe or children, Not in universe, White, All other, Female, Not in universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler, Not in universe, Not in universe, Child <18 never marr not in subfamily, Child under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not in universe, 0, Both parents present, United-States, United-States, United-States, Native- Born in the United States, 0, Not in universe, 0, 0, 94, - 50000.
我根据 CSV 中的第 25 个值进行排序。在这一行中,即 1758.14。
我注意到 sortBy 的性能比其他方法差。这是预期的情况吗?如果是,为什么 mapPartitions() 和 reduce() 不是默认的排序方法?
这是我的实现
public static void sortBy(JavaSparkContext sc){
JavaRDD<String> rdd = sc.textFile("/data.txt",32);
long start = System.currentTimeMillis();
rdd.sortBy(new Function<String, Double>(){
@Override
public Double call(String v1) throws Exception {
// TODO Auto-generated method stub
String [] arr = v1.split(",");
return Double.parseDouble(arr[24]);
}
}, true, 9).collect();
long end = System.currentTimeMillis();
System.out.println("SortBy: " + (end - start));
}
public static void sortList(JavaSparkContext sc){
JavaRDD<String> rdd = sc.textFile("/data.txt",32); //parallelize(l, 8);
long start = System.currentTimeMillis();
JavaRDD<LinkedList<Tuple2<Double, String>>> rdd3 = rdd.mapPartitions(new FlatMapFunction<Iterator<String>, LinkedList<Tuple2<Double, String>>>(){
@Override
public Iterable<LinkedList<Tuple2<Double, String>>> call(Iterator<String> t)
throws Exception {
// TODO Auto-generated method stub
LinkedList<Tuple2<Double, String>> lines = new LinkedList<Tuple2<Double, String>>();
while(t.hasNext()){
String s = t.next();
String arr1[] = s.split(",");
Tuple2<Double, String> t1 = new Tuple2<Double, String>(Double.parseDouble(arr1[24]),s);
lines.add(t1);
}
Collections.sort(lines, new IncomeComparator());
LinkedList<LinkedList<Tuple2<Double, String>>> list = new LinkedList<LinkedList<Tuple2<Double, String>>>();
list.add(lines);
return list;
}
});
rdd3.reduce(new Function2<LinkedList<Tuple2<Double, String>>, LinkedList<Tuple2<Double, String>>, LinkedList<Tuple2<Double, String>>>(){
@Override
public LinkedList<Tuple2<Double, String>> call(
LinkedList<Tuple2<Double, String>> a,
LinkedList<Tuple2<Double, String>> b) throws Exception {
// TODO Auto-generated method stub
LinkedList<Tuple2<Double, String>> result = new LinkedList<Tuple2<Double, String>>();
while (a.size() > 0 && b.size() > 0) {
if (a.getFirst()._1.compareTo(b.getFirst()._1) <= 0)
result.add(a.poll());
else
result.add(b.poll());
}
while (a.size() > 0)
result.add(a.poll());
while (b.size() > 0)
result.add(b.poll());
return result;
}
});
long end = System.currentTimeMillis();
System.out.println("MapPartitions: " + (end - start));
}
【问题讨论】:
-
这对于邮件列表来说可能是一个更好的问题。
标签: sorting apache-spark rdd