- mapPartitions 使用例如 .sorted 对每个分区进行排序
- repartitionAndSortWithinPartitions 有效地对分区进行排序
同时重新分区。
- sortBy 进行全局排序
RDD
如 Spark API 中所写,repartitionAndSortWithinPartitions 比调用 repartition 然后在每个分区内排序更有效,换句话说,repartitionAndSortWithinPartitions 将首先根据提供的分区器对数据进行重新分区,然后按 key 排序:
所以先重新分区然后调用 sortBy 会给你很好的性能
您可以使用 repartitionAndSortWithinPartitions 实现同样的目标
添加一些排序示例希望对您有所帮助。
前 1
val rdd = sc.parallelize(Seq(
| ("math", 55),
| ("math", 56),
| ("english", 57),
| ("english", 58),
| ("science", 59),
| ("science", 54)))
rdd.collect()
//Default Sorting : Ascending order
val sorted1 = rdd.sortByKey()
sorted1.collect()
//Custom Sorting : Descending order (using implicit 'Ordering')
{
| //Let us define an implicit sorting for the method sortByKey()
| //We have used '{' above to limit the scope of the implicit ordering
| implicit val sortIntegersByString = new Ordering[String] {
| override def compare(a: String, b: String) = {
| val result = a.compare(b)
| //We use -ve to sort the key in descending order
| -result
| }
| }
| val sorted2 = rdd.sortByKey()
|
| //Result
| sorted2.collect()
| }
//Default Sorting : Descending order (done using the 'ascending' flag argument)
val sorted3 = rdd.sortByKey(false)
//Result
sorted3.collect()
结果:
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[101] at parallelize at command-1784487111427703:1
sorted1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[104] at sortByKey at command-1784487111427703:12
sorted3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[110] at sortByKey at command-1784487111427703:34
res28: Array[(String, Int)] = Array((science,59), (science,54), (math,55), (math,56), (english,57), (english,58))
前 2
case class Row(var firstName: String, var lastName: String, var city: String)
var rows = List(new Row("Oscar", "Wilde", "London"),
new Row("Otto", "Swift", "Berlin"),
new Row("Carl", "Swift", "Paris"),
new Row("Hans", "Swift", "Dublin"),
new Row("Hugo", "Swift", "Sligo"))
//print ("sort by last name")
//rows.sortBy(_.lastName)
print ("sort by last name and first name")
rows.sortBy(r => (r.lastName, r.firstName))
sort by last name and first namedefined class Row
rows: List[Row] = List(Row(Oscar,Wilde,London), Row(Otto,Swift,Berlin), Row(Carl,Swift,Paris), Row(Hans,Swift,Dublin), Row(Hugo,Swift,Sligo))
res26: List[Row] = List(Row(Carl,Swift,Paris), Row(Hans,Swift,Dublin), Row(Hugo,Swift,Sligo), Row(Otto,Swift,Berlin), Row(Oscar,Wilde,London))
RDD 与数据集:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
case class MyRecord(time: Double, id: String)
val rdd = sc.parallelize(1 to 200, 200).flatMap(x =>Seq.fill(10000)(MyRecord(util.Random.nextDouble, "xxx")))
// sort this RDD by time:
val sorted = rdd.sortBy(x => x.time)
result.count
// convert the original RDD to Dataframe and sort again:
val df = sqlContext.createDataFrame(rdd)
df.registerTempTable("data")
val result = sqlContext.sql("select * from data order by time")
result.count