【问题标题】:How to collect and process column-wise data in Spark如何在 Spark 中收集和处理列数据
【发布时间】:2018-01-06 15:46:34
【问题描述】:

我有一个包含 7 天 24 小时数据的数据框,因此它有 144 列。

id     d1h1  d1h2   d1h3 .....  d7h24 
aaa    21     24     8   .....   14       
bbb    16     12     2   .....   4
ccc    21      2     7   .....   6

我想做的是找到每天最多 3 个值:

id    d1        d2       d3  ....   d7
aaa  [22,2,2] [17,2,2] [21,8,3]    [32,11,2]
bbb  [32,22,12] [47,22,2] [31,14,3]    [32,11,2]
ccc  [12,7,4] [28,14,7] [11,2,1]    [19,14,7] 

【问题讨论】:

    标签: scala apache-spark spark-dataframe


    【解决方案1】:
    import org.apache.spark.sql.functions._
    var df = ...
    val first3 = udf((list : Seq[Double]) => list.slice(0,3))
    for (i <- 1 until 7) {
        val columns = (1 until 24).map(x=> "d"+i+"h"+x)
        df = df
            .withColumn("d"+i, first3(sort_array(array(columns.head, columns.tail :_*), false)))
            .drop(columns :_*)
    }
    

    这应该给你你想要的。事实上,我每天将 24 小时汇总到一个数组列中,按 desc 顺序排序,然后从中选择前 3 个元素。

    【讨论】:

    • 似乎无法通过,因为 first3(sort_array(array(columns.head, columns.tail :_*), false)) 返回排序后的列名,而不是值
    • org.apache.spark.SparkException: 无法执行用户定义函数(anonfun$1: (array) => array)
    【解决方案2】:

    定义模式:

    val p = "^(d[1-7])h[0-9]{1,2}$".r
    

    分组列:

    import org.apache.spark.sql.functions._
    
    val cols = df.columns.tail
      .groupBy { case p(d) => d }
      .map { case (c, cs) =>  {
        val sorted = sort_array(array(cs map col: _*), false)
        array(sorted(0), sorted(1), sorted(2)).as(c)
      }}
    

    然后选择:

    df.select($"id" +: cols.toSeq: _*)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-02-10
      • 2016-11-04
      • 2018-05-26
      • 2018-04-30
      • 1970-01-01
      • 1970-01-01
      • 2016-01-27
      • 2021-01-01
      相关资源
      最近更新 更多