【问题标题】:Spark Scala: Aggregate DataFrame Column Values into a Ordered ListSpark Scala:将 DataFrame 列值聚合到有序列表中
【发布时间】:2017-03-10 18:10:54
【问题描述】:

我有一个 spark scala DataFrame,它有四个值:(id、day、val、order)。我想创建一个带有列的新 DataFrame: (id, day, value_list: List(val1, val2, ..., valn)) 其中 val1 到 valn 按 asc 顺序值排序。

例如:

(50, 113, 1, 1), 
(50, 113, 1, 3), 
(50, 113, 2, 2), 
(51, 114, 1, 2), 
(51, 114, 2, 1), 
(51, 113, 1, 1)

会变成:

((51,113),List(1))
((51,114),List(2, 1)
((50,113),List(1, 2, 1))

我很接近,但不知道在将数据汇总到列表后该怎么做。我不确定如何按 int 顺序对每个值列表进行火花排序:

import org.apache.spark.sql.Row

val testList = List((50, 113, 1, 1), (50, 113, 1, 3), (50, 113, 2, 2), (51, 114, 1, 2), (51, 114, 2, 1), (51, 113, 1, 1))
val testDF = sqlContext.sparkContext.parallelize(testList).toDF("id1", "id2", "val", "order")

val rDD1 = testDF.map{case Row(key1: Int, key2: Int, val1: Int, val2: Int)  => ((key1, key2), List((val1, val2)))}
val rDD2 = rDD1.reduceByKey{case (x, y) =>  x ++ y}

输出的样子:

((51,113),List((1,1)))
((51,114),List((1,2), (2,1)))
((50,113),List((1,3), (1,1), (2,2)))

下一步是生产:

((51,113),List((1,1)))
((51,114),List((2,1), (1,2)))
((50,113),List((1,1), (2,2), (1,3)))

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    您只需映射您的RDD 并使用sortBy

    scala> val df = Seq((50, 113, 1, 1), (50, 113, 1, 3), (50, 113, 2, 2), (51, 114, 1, 2), (51, 114, 2, 1), (51, 113, 1, 1)).toDF("id1", "id2", "val", "order")
    df: org.apache.spark.sql.DataFrame = [id1: int, id2: int, val: int, order: int]
    
    scala> import org.apache.spark.sql.Row
    import org.apache.spark.sql.Row
    
    scala> val rDD1 = df.map{case Row(key1: Int, key2: Int, val1: Int, val2: Int)  => ((key1, key2), List((val1, val2)))}
    rDD1: org.apache.spark.rdd.RDD[((Int, Int), List[(Int, Int)])] = MapPartitionsRDD[10] at map at <console>:28
    
    scala> val rDD2 = rDD1.reduceByKey{case (x, y) =>  x ++ y}
    rDD2: org.apache.spark.rdd.RDD[((Int, Int), List[(Int, Int)])] = ShuffledRDD[11] at reduceByKey at <console>:30
    
    scala> val rDD3 = rDD2.map(x => (x._1, x._2.sortBy(_._2)))
    rDD3: org.apache.spark.rdd.RDD[((Int, Int), List[(Int, Int)])] = MapPartitionsRDD[12] at map at <console>:32
    
    scala> rDD3.collect.foreach(println)
    ((51,113),List((1,1)))
    ((50,113),List((1,1), (2,2), (1,3)))
    ((51,114),List((2,1), (1,2)))
    

    【讨论】:

      【解决方案2】:
      testDF.groupBy("id1","id2").agg(collect_list($"val")).show
      +---+---+-----------------+                                                     
      |id1|id2|collect_list(val)|
      +---+---+-----------------+
      | 51|113|              [1]|
      | 51|114|           [1, 2]|
      | 50|113|        [1, 1, 2]|
      +---+---+-----------------+
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2015-08-11
        • 2018-02-21
        • 1970-01-01
        • 1970-01-01
        • 2021-11-15
        • 2010-11-04
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多