【发布时间】:2016-11-03 01:13:08
【问题描述】:
我正在尝试按值对 RDD 进行排序,如果多个值相等,则我需要按字典顺序按键对这些值进行排序。
代码:
JavaPairRDD <String,Long> rddToSort = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , String, Long > () {
@Override
public Tuple2 < String, Long > call(Tuple2 < String, MovieReview > t) throws Exception {
return new Tuple2 < String, Long > (t._1, t._2.count);
}
});
到目前为止我所做的是,使用takeOrdered 并提供CustomComperator,但是由于takeOrdered 无法处理大量数据,因此在运行代码时它会一直退出(它吃了很多操作系统无法处理的内存):
List < Tuple2 < String, Long >> rddSorted = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , String, Long > () {
@Override
public Tuple2 < String, Long > call(Tuple2 < String, MovieReview > t) throws Exception {
return new Tuple2 < String, Long > (t._1, t._2.count);
}
}).takeOrdered(newTopMovies, MapLongValueComparator.VALUE_COMP);
比较器:
static class MapLongValueComparator implements Comparator < Tuple2 < String, Long >> , Serializable {
private static final long serialVersionUID = 1L;
private static final MapLongValueComparator VALUE_COMP = new MapLongValueComparator();
@Override
public int compare(Tuple2 < String, Long > o1, Tuple2 < String, Long > o2) {
if (o1._2.compareTo(o2._2) == 0) {
return o1._1.compareTo(o2._1);
}
return -o1._2.compareTo(o2._2);
}
}
错误:
16/06/30 21:09:23 INFO scheduler.DAGScheduler: Job 18 failed: takeOrdered at MovieAnalyzer.java:708, took 418.149182 s
你会如何排序这个 RDD?您将如何考虑 TopKMovies 的价值,以及在字典中相等键的情况下。
谢谢。
【问题讨论】:
-
你能提供堆栈跟踪(如果有的话?)。因为您提到它可能是内存问题,但错误消息不允许查看到底发生了什么。
-
@Serhiy 我猜这是内存问题,因为 takeOrdered 操作需要很长时间,因为它在分布式模式下处理大量数据,我也得到了退出代码:137 和退出代码:1 .以其他方式接近排序肯定会解决问题。
-
您是否尝试过重新分区数据?当您映射到配对时,您可以立即重新分区。
-
分区会将您的数据分成多个部分,因此具有配置限制的机器可以在部分而不是整个数据上工作,这在某些时候会停止适合机器内存(不要忘记 Spark 确实内存计算)。在将值映射到配对后,您可以调用 partitionBy 方法。您将需要通过实现您的分区器,在非常简单的情况下,它可以简单地按字符串的第一个字母进行分区(我猜这是电影名称?)。您可能需要进一步试验分区,以防它仍然会 OOM。
-
可以解决takeOrdered对大量数据的超时问题,但是不能正确排序!
标签: java hadoop apache-spark