【问题标题】:Spark SQL first and last aggregated functions -- Unexpected ResultSpark SQL first 和 last 聚合函数——意外结果
【发布时间】:2019-09-08 18:48:33
【问题描述】:

在 Spark Dataframe 上执行第一个和最后一个聚合函数时出现意外结果。

我有一个包含 colA、colB、colC、colD、colE、extraCol1、extraCol2 列的 spark 数据框

我需要通过

对这个数据框进行聚合

分组 -> colA & colB,max -> colC,max -> colD,first -> colE, extraCol1, extraCol2

所以下面是我正在使用的数据帧(df),我正在使用火花分区(3)

colA    colB    colC    colD    colE    extraCol1   extracol2
Harshit 23        43    44         A           q    z
Mohit   24        56    62         B           w    x
Harshit 23        32    44         C           e    c
Kali    10        20    460        D           r    v
Aman    20        30    180        E           t    b
Ram     30        100   270        F          yu    n
Kali    10        600   360        G          io    m
Kali    10        600   460        k           p    o

下面是我用来执行groupBy操作的scala和spark代码

 val cols = List("colA","colB")

 var  aggFuncSeq = List(max(`colC`) as colC_new, max(`colD`) as colD_new, first(`colE`,true) as colE, first(`extracol2`,true) as extracol2, first(`extraCol1`,true) as extraCol1)

 var aggFuncs = aggFuncSeq.map(e => expr(e))

 df = df.groupBy(cols.head, cols.tail: _*).agg(aggFuncs.head, aggFuncs.tail: _*)

 df.show(10)

执行后,我得到了意想不到的结果,如下所示。

colA    colB    colC_new    colD_new    colE    extracol2   extraCol1
Harshit     23      43            44        C       c       e
Aman        20      30            180       E       b       t
Kali        10      600           460       D       v       r
Ram         30      100           270       F       n       yu
Mohit       24      56            62        B       x       w

但是根据所执行的分组条件和聚合操作,输出结果应该有对应于Harshit的第一行,对于colE,extracol2,extracol1

所以,预期的结果如下

colA    colB    colC_new    colD_new    colE    extracol2   extraCol1
Harshit     23      43            44        A       q       z
Aman        20      30            180       E       b       t
Kali        10      600           460       D       v       r
Ram         30      100           270       F       n       yu
Mohit       24      56            62        B       x       w

但我无法理解这个 SQL 概念,这怎么可能。所以,如果有人能帮我解决这个奇怪的问题。

是因为分区吗?

它是如何给出这个结果的以及如何将它修复为预期的结果?

感谢任何帮助。 谢谢

【问题讨论】:

  • 首先是窗口函数,直到你按它排序才会给你预期的结果。你需要做一些像 Window.partitionBy(colA,colB).orderBy(colE))
  • @sp_user123 然后我也可以做 max(colE) ,它会给我同样的结果。但我想根据用户提供的输入数据框获取第一列或最后一列
  • orderby(colE) 只是一个例子,在你的情况下,我认为你需要使用与 group coulmns 相同的键(desc 或 asc)
  • 您可能需要在进行任何改组之前添加一列 F.monotonically_increasing_id
  • @TarunKhaneja 请查看下面的选项,如果适合您的需求/问题,请不要忘记接受答案。谢谢!

标签: scala apache-spark dataframe apache-spark-sql


【解决方案1】:

当您在 Spark 中groupBy 时,您可以更改 DataFrame 的顺序。但并非总是如此(例如,如果您的数据包含在单个工作人员中,则不会更改)。因此,为了确保并获得可扩展的解决方案,您需要在窗口函数中重新排序。

在这种情况下,试试这个:

val w = Window.partitionBy($"key").orderBy($"value")
df
  .withColumn("row_number", row_number.over(w))
  .where($"row_number" === 1)
  .drop("row_number")

这仅选择第一行,在row_number 上过滤,row_number 定义为排序后行的索引。之后将其丢弃,因为它变得无用。

备注:您可以将 $ 运算符替换为 col 运算符。它只是更简洁代码的捷径。

【讨论】:

  • 刚刚更新了我的答案,如果这不起作用,请告诉我。
【解决方案2】:
 import org.apache.spark.sql.functions.{max, _}
    import spark.implicits._

    val columnsDF = Seq(
      ("Harshit", 23, 43, 44, "A", "q", "z"),
      ("Mohit", 24, 56, 62, "B", "w", "x"),
      ("Harshit", 23, 32, 44, "C", "e", "c"),
      ("Kali", 10, 20, 460, "D", "r", "v"),
      ("Aman", 20, 30, 180, "E", "t", "b"),
      ("Ram", 30, 100, 270, "F", "yu", "n"),
      ("Kali", 10, 600, 360, "G", "io", "m"),
      ("Kali", 10, 600, 460, "k", "p", "o")
    ).toDF("ColA", "ColB", "ColC", "ColD", "ColE", "extraCol1", "extraCol2")


    println("Before Aggregation")
    columnsDF.show()

    val cols = List("colA", "colB")

    println("After Aggregation")
    val aggSeqFunction = columnsDF.agg(max(columnsDF.columns(2)),
      max(columnsDF.columns(3)),
      first(columnsDF.columns(4)),
      first(columnsDF.columns(6)),
      first(columnsDF.columns(5)))

    val aggFunction = aggSeqFunction.columns.map(en => expr(en))


    columnsDF.groupBy(cols.head, cols.tail: _*).agg(aggFunction.head, aggFunction.tail: _*).show()

    /*
            +-------+----+---------+---------+------------------+-----------------------+-----------------------+
            |   colA|colB|max(ColC)|max(ColD)|first(ColE, false)|first(extraCol2, false)|first(extraCol1, false)|
            +-------+----+---------+---------+------------------+-----------------------+-----------------------+
            |Harshit|  23|       43|       44|                 A|                      z|                      q|
            |   Aman|  20|       30|      180|                 E|                      b|                      t|
            |   Kali|  10|      600|      460|                 D|                      v|                      r|
            |    Ram|  30|      100|      270|                 F|                      n|                     yu|
            |  Mohit|  24|       56|       62|                 B|                      x|                      w|
            +-------+----+---------+---------+------------------+-----------------------+-----------------------+
     */

我能够得出预期的结果。

希望这会有所帮助。

【讨论】:

  • 这并不是因为它适用于您的机器,因此该选项在集群上是可扩展的。在groupBy 之后,您无法保证您的DataFrame 将保持排序状态。
猜你喜欢
  • 2017-06-24
  • 1970-01-01
  • 1970-01-01
  • 2014-06-01
  • 2023-04-02
  • 1970-01-01
  • 1970-01-01
  • 2016-10-06
  • 2021-02-03
相关资源
最近更新 更多