【问题标题】:Sorting using spark使用火花排序
【发布时间】:2017-10-09 14:58:00
【问题描述】:

我需要对 RDD 进行排序。排序需要在我的记录的多个字段上,因此我需要一个自定义比较器。

我看到 sortBy 因为它只接受一个键。我偶然发现了http://codingjunkie.net/spark-secondary-sort/,因此使用repartitionAndSortWithinPartitions 来实现同样的效果。

为什么sortBy 不接受自定义比较器和排序?为什么我必须重新分区才能使用自定义比较器?

【问题讨论】:

    标签: java sorting apache-spark distributed-computing


    【解决方案1】:

    问题1:这是方法sortBy签名

      /**
       * Return this RDD sorted by the given key function.
       */
      def sortBy[K](
          f: (T) => K,
          ascending: Boolean = true,
          numPartitions: Int = this.partitions.length)
          (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
        this.keyBy[K](f)
            .sortByKey(ascending, numPartitions)
            .values
      }
    

    你的 RDD 数据对象显然是 T 类型

    请注意 sortBy 方法绝对有单键参数字段: f: (T) => K

    它接受匿名函数,因此您可以轻松生成自定义的可比较结构,并充分利用具有自己明确定义的比较器的常见数据类型。

    例如,如果你的RDD[Int, Int],我们称之为data,你可以这样做:

    val cmp = (t: (Int, Int)) => (t._1, -t._2)
    data.sortBy(cmp)
    

    这样可以轻松实现多字段比较吧?

    这将得到一个排序的 RDD,其中第一个字段升序,第二个字段 下降。

    问题2:repartitionAndSortWithinPartitions 用法

    这是一个特定的 rdd 运算符,旨在比调用 repartition 然后在每个分区内排序更有效。

    您的程序在排序之前不需要预先重新分区,它只是在这种特殊的通用模式下进行内部优化以获得高性能。

    详情请咨询document

    【讨论】:

    【解决方案2】:
    • mapPartitions 使用例如 .sorted 对每个分区进行排序
    • repartitionAndSortWithinPartitions 有效地对分区进行排序 同时重新分区。
    • sortBy 进行全局排序 RDD
    • RDD 的 sortByKey 方法用于总排序

    • RDD 的 repartitionAndSortWithinPartitions 是在分区内使用排序,而不是跨分区,但不幸的是它增加了一个额外的步骤来进行重新分区

    如 Spark API 中所写,repartitionAndSortWithinPartitions 比调用 repartition 然后在每个分区内排序更有效,换句话说,repartitionAndSortWithinPartitions 将首先根据提供的分区器对数据进行重新分区,然后按 key 排序:

    所以先重新分区然后调用 sortBy 会给你很好的性能 您可以使用 repartitionAndSortWithinPartitions 实现同样的目标

    添加一些排序示例希望对您有所帮助。

    前 1

    val rdd = sc.parallelize(Seq(
         |                ("math",    55),
         |                ("math",    56),
         |                ("english", 57),
         |                ("english", 58),
         |                ("science", 59),
         |                ("science", 54)))
    
    rdd.collect()
    
    //Default Sorting : Ascending order
    val sorted1 = rdd.sortByKey()
    
     sorted1.collect()
    
     //Custom Sorting : Descending order (using implicit 'Ordering')
     {
         |    //Let us define an implicit sorting for the method sortByKey()
         |    //We have used '{' above to limit the scope of the implicit ordering
         |    implicit val sortIntegersByString = new Ordering[String] {
         |       override def compare(a: String, b: String) = {
         |          val result = a.compare(b)
         |          //We use -ve to sort the key in descending order
         |          -result
         |       }
         |    }
         |    val sorted2 = rdd.sortByKey()
         |
         |    //Result
         |    sorted2.collect()
         | }
    
    //Default Sorting : Descending order (done using the 'ascending' flag argument)
     val sorted3 = rdd.sortByKey(false)
    
    //Result
    sorted3.collect()
    

    结果:

    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[101] at parallelize at command-1784487111427703:1
    sorted1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[104] at sortByKey at command-1784487111427703:12
    sorted3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[110] at sortByKey at command-1784487111427703:34
    res28: Array[(String, Int)] = Array((science,59), (science,54), (math,55), (math,56), (english,57), (english,58))
    

    前 2

    case class Row(var firstName: String, var lastName: String, var city: String)
    
    var rows = List(new Row("Oscar", "Wilde", "London"),
                    new Row("Otto",  "Swift", "Berlin"),
                    new Row("Carl",  "Swift", "Paris"),
                    new Row("Hans",  "Swift", "Dublin"),
                    new Row("Hugo",  "Swift", "Sligo"))
    
    //print ("sort by last name")
    //rows.sortBy(_.lastName)
    
    
    print ("sort by last name and first name")
    
    rows.sortBy(r => (r.lastName, r.firstName))
    
    
    
    sort by last name and first namedefined class Row
    rows: List[Row] = List(Row(Oscar,Wilde,London), Row(Otto,Swift,Berlin), Row(Carl,Swift,Paris), Row(Hans,Swift,Dublin), Row(Hugo,Swift,Sligo))
    res26: List[Row] = List(Row(Carl,Swift,Paris), Row(Hans,Swift,Dublin), Row(Hugo,Swift,Sligo), Row(Otto,Swift,Berlin), Row(Oscar,Wilde,London))
    

    RDD 与数据集:

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    case class MyRecord(time: Double, id: String)
    val rdd = sc.parallelize(1 to 200, 200).flatMap(x =>Seq.fill(10000)(MyRecord(util.Random.nextDouble, "xxx")))
    // sort this RDD by time:
    val sorted = rdd.sortBy(x => x.time)
    result.count
    
    // convert the original RDD to Dataframe and sort again:
    val df = sqlContext.createDataFrame(rdd)
    df.registerTempTable("data")
    val result = sqlContext.sql("select * from data order by time")
    result.count
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-01-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-08-09
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多