【问题标题】:How to group by and merge these rows of spark dataframe's group如何分组和合并这些火花数据框组的行
【发布时间】:2020-08-20 03:19:51
【问题描述】:

假设我有一张这样的桌子,

A  | B  |    C     | D  |  E  | F
x1 | 5  | 20200115 | 15 | 4.5 | 1
x1 | 10 | 20200825 | 15 | 5.6 | 19
x2 | 10 | 20200115 | 15 | 4.1 | 1
x2 | 10 | 20200430 | 15 | 9.1 | 1

我希望在 col A 上合并这些行并生成这样的数据框

A  | B  |    C     | D  |  E  | F
x1 | 15 | 20200825 | 15 | 5.6 | 19
x2 | 10 | 20200115 | 15 | 4.1 | 1
x2 | 10 | 20200430 | 15 | 9.1 | 1

基本上,如果A列中组的B列总和等于D列的值,那么,

  1. B 列的新值将是 B 列的总和
  2. C、E、F 列将根据 C 列中的最新(即 YYYYmmDD 中的日期)选取

由于对于 X2 组,上述条件不成立(即 B 列之和为 20 大于 D 列 15),我想保留目标中的两条记录

假设:在我的数据中,给定组的 D 列将是相同的(在本例中为 15)

我查看了一堆分组和窗口化(分区)示例,但在我看来这是不同的,我无法缩小路线范围。

我可以将分组数据通过管道传输到 UDF 并执行一些操作吗?

PS:在 pyspark 中构建,如果你的示例可以在 pyspark 中,那就太好了

【问题讨论】:

    标签: pandas apache-spark pyspark apache-spark-sql grouping


    【解决方案1】:

    试试这个-

    sum + max 与窗口功能一起使用

    df.show(false)
        df.printSchema()
        /**
          * +---+---+--------+---+---+---+
          * |A  |B  |C       |D  |E  |F  |
          * +---+---+--------+---+---+---+
          * |x1 |5  |20200115|15 |4.5|1  |
          * |x1 |10 |20200825|15 |5.6|19 |
          * |x2 |10 |20200115|15 |4.1|1  |
          * |x2 |10 |20200430|15 |9.1|1  |
          * +---+---+--------+---+---+---+
          *
          * root
          * |-- A: string (nullable = true)
          * |-- B: integer (nullable = true)
          * |-- C: integer (nullable = true)
          * |-- D: integer (nullable = true)
          * |-- E: double (nullable = true)
          * |-- F: integer (nullable = true)
          */
    
        val w = Window.partitionBy("A")
        df.withColumn("sum", sum("B").over(w))
          .withColumn("latestC", max("C").over(w))
          .withColumn("retain",
            when($"sum" === $"D", when($"latestC" === $"C", true).otherwise(false) )
              .otherwise(true) )
          .where($"retain" === true)
          .withColumn("B", when($"sum" === $"D", when($"latestC" === $"C", $"sum").otherwise($"B") )
            .otherwise($"B"))
          .show(false)
    
        /**
          * +---+---+--------+---+---+---+---+--------+------+
          * |A  |B  |C       |D  |E  |F  |sum|latestC |retain|
          * +---+---+--------+---+---+---+---+--------+------+
          * |x1 |15 |20200825|15 |5.6|19 |15 |20200825|true  |
          * |x2 |10 |20200115|15 |4.1|1  |20 |20200430|true  |
          * |x2 |10 |20200430|15 |9.1|1  |20 |20200430|true  |
          * +---+---+--------+---+---+---+---+--------+------+
          */
    

    【讨论】:

      【解决方案2】:

      在 pyspark 中,我会这样做:

      from pyspark.sql import functions as F, Window as W
      
      b = ["A", "B", "C", "D", "E", "F"]
      a = [
          ("x1", 5, "20200115", 15, 4.5, 1),
          ("x1", 10, "20200825", 15, 5.6, 19),
          ("x2", 10, "20200115", 15, 4.1, 1),
          ("x2", 10, "20200430", 15, 9.1, 1),
      ]
      
      df = spark.createDataFrame(a, b)
      
      
      df = df.withColumn("B_sum", F.sum("B").over(W.partitionBy("A")))
      
      process_df = df.where("D >= B_Sum")
      no_process_df = df.where("D < B_sum").drop("B_sum")
      
      
      process_df = (
          process_df.withColumn(
              "rng", F.row_number().over(W.partitionBy("A").orderBy(F.col("C").desc()))
          )
          .where("rng=1")
          .select("A", F.col("B_sum").alias("B"), "C", "D", "E", "F",)
      )
      
      final_output = process_df.unionByName(no_process_df)
      +---+---+--------+---+---+---+
      |  A|  B|       C|  D|  E|  F|
      +---+---+--------+---+---+---+
      | x1| 15|20200825| 15|5.6| 19|
      | x2| 10|20200115| 15|4.1|  1|
      | x2| 10|20200430| 15|9.1|  1|
      +---+---+--------+---+---+---+
      
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-11-24
        • 1970-01-01
        • 1970-01-01
        • 2018-11-30
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多