【问题标题】:Sorting JavaPairRDD first by value and then by key首先按值排序 JavaPairRDD,然后按键排序
【发布时间】:2016-11-03 01:13:08
【问题描述】:

我正在尝试按值对 RDD 进行排序,如果多个值相等,则我需要按字典顺序按键对这些值进行排序。

代码:

JavaPairRDD <String,Long> rddToSort = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , String, Long > () {

    @Override
    public Tuple2 < String, Long > call(Tuple2 < String, MovieReview > t) throws Exception {
        return new Tuple2 < String, Long > (t._1, t._2.count);
    }
});

到目前为止我所做的是,使用takeOrdered 并提供CustomComperator,但是由于takeOrdered 无法处理大量数据,因此在运行代码时它会一直退出(它吃了很多操作系统无法处理的内存):

List < Tuple2 < String, Long >> rddSorted = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , String, Long > () {

    @Override
    public Tuple2 < String, Long > call(Tuple2 < String, MovieReview > t) throws Exception {
        return new Tuple2 < String, Long > (t._1, t._2.count);
    }
}).takeOrdered(newTopMovies, MapLongValueComparator.VALUE_COMP);

比较器:

    static class MapLongValueComparator implements Comparator < Tuple2 < String, Long >> , Serializable {
        private static final long serialVersionUID = 1L;

        private static final MapLongValueComparator VALUE_COMP = new MapLongValueComparator();

        @Override
        public int compare(Tuple2 < String, Long > o1, Tuple2 < String, Long > o2) {
            if (o1._2.compareTo(o2._2) == 0) {
                return o1._1.compareTo(o2._1);
            }
            return -o1._2.compareTo(o2._2);
        }
}

错误:

16/06/30 21:09:23 INFO scheduler.DAGScheduler: Job 18 failed: takeOrdered at MovieAnalyzer.java:708, took 418.149182 s

你会如何排序这个 RDD?您将如何考虑 TopKMovies 的价值,以及在字典中相等键的情况下。

谢谢。

【问题讨论】:

  • 你能提供堆栈跟踪(如果有的话?)。因为您提到它可能是内存问题,但错误消息不允许查看到底发生了什么。
  • @Serhiy 我猜这是内存问题,因为 takeOrdered 操作需要很长时间,因为它在分布式模式下处理大量数据,我也得到了退出代码:137 和退出代码:1 .以其他方式接近排序肯定会解决问题。
  • 您是否尝试过重新分区数据?当您映射到配对时,您可以立即重新分区。
  • 分区会将您的数据分成多个部分,因此具有配置限制的机器可以在部分而不是整个数据上工作,这在某些时候会停止适合机器内存(不要忘记 Spark 确实内存计算)。在将值映射到配对后,您可以调用 partitionBy 方法。您将需要通过实现您的分区器,在非常简单的情况下,它可以简单地按字符串的第一个字母进行分区(我猜这是电影名称?)。您可能需要进一步试验分区,以防它仍然会 OOM。
  • 可以解决takeOrdered对大量数据的超时问题,但是不能正确排序!

标签: java hadoop apache-spark


【解决方案1】:

在将&lt;String, Long&gt; PairRDD 映射到&lt; Tuple2&lt;String,Long&gt; , Long&gt; PairRDD 后,使用带有比较器和分区的 sortByKey 解决了问题

JavaPairRDD <Tuple2<String,Long>, Long> sortedRdd = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , Tuple2<String,Long>, Long > () {

    @Override
    public Tuple2 < Tuple2<String,Long>, Long > call(Tuple2 < String, MovieReview > t) throws Exception {
        return new Tuple2 < Tuple2<String,Long>, Long > (new Tuple2<String,Long>(t._1,t._2.count), t._2.count);
    }
}).sortByKey(new TupleMapLongComparator(), true, 100);


JavaPairRDD <String,Long> sortedRddToPairs = sortedRdd.mapToPair(new PairFunction<Tuple2<Tuple2<String,Long>,Long>, String, Long>() {

    @Override
    public Tuple2<String, Long> call(
            Tuple2<Tuple2<String, Long>, Long> t) throws Exception {
        return new Tuple2 < String, Long > (t._1._1, t._1._2);
    }

});

比较器:

private class TupleMapLongComparator implements Comparator<Tuple2<String,Long>>, Serializable {
    @Override
    public int compare(Tuple2<String,Long> tuple1, Tuple2<String,Long> tuple2) {

        if (tuple1._2.compareTo(tuple2._2) == 0) {
            return tuple1._1.compareTo(tuple2._1);
        }
        return -tuple1._2.compareTo(tuple2._2);
    }
}

【讨论】:

    【解决方案2】:

    你在 Spark 中尝试过二次排序吗?

    Spark Secondary Sort

    【讨论】:

      猜你喜欢
      • 2014-01-18
      • 1970-01-01
      • 1970-01-01
      • 2023-03-05
      • 2013-06-29
      • 1970-01-01
      • 2013-08-29
      • 2013-11-19
      • 2019-06-26
      相关资源
      最近更新 更多