【问题标题】:Spark SQL: Window function lag until a condition metSpark SQL:窗口函数滞后直到满足条件
【发布时间】:2019-04-09 16:52:58
【问题描述】:

我正在 Spark 中处理这个数据集:

+------------+------------+------------+
|     ColumnA|     ColumnB|     Result |
+------------+------------+------------+
|      ABCDEF|    MNOPQRST|      true  |
|      123455|      UVWXYZ|      false |
|      ABCDEF|    MNOPQRST|      false | (should be true)
|      123455|      UVWXYZ|      false |
|      123455|      UVWXYZ|      false |
|      ABCDEF|    EFGHIJK |      false |
+------------+------------+------------+

规则是:

  1. 如果给定分区集 Result 的等级为 1,则为 true。
  2. 如果等级不是 1 且 ColumnA 的值为 123455,则将 Result 的值设置为 false
  3. 如果排名不是 1 且 ColumnA 值不是 123455 并且如果 ColumnB 值与前一行的 ColumnB 值匹配,则将 Result 设置为 true。确保上一行的ColumnA的值不是123455

    WindowSpec w = Window.partitionBy("ColumnA, ColumnB");

    列 matchColumnB = functions.col("ColumnB").equalTo( functions.lag("ColumnB", 1).over(w));

这里窗口函数检查上一行而不考虑上一行的ColumnA值。

例如在上面的数据集中,第 3 行的 ColumnB 值应该与第 1 行而不是第 2 行进行比较。

我尝试查看Window.unboundedPreceding,但不确定如何在这种情况下使用它。

有没有办法做到这一点?

【问题讨论】:

    标签: java apache-spark apache-spark-sql


    【解决方案1】:

    复制 DF:

    val df = sc.parallelize(List(("ABCDEF","MNOPQRST"), 
                        ("123455","UVWXYZ"),
                        ("ABCDEF","MNOPQRST"),
                        ("123455","UVWXYZ"),
                        ("123455","UVWXYZ"), 
                        ("ABCDEF","EFGHIJK")))
       .toDF("ColumnA","ColumnB")
    

    提供的信息中存在一些矛盾,例如,您的窗口实现使得无法应用上述条件。

    在根据行的顺序[排名和与前一行的比较]进行工作时,窗口分析有一些基本要素

    1. 您需要定义适当的分区列。如果窗口被columnAcolumnB 划分,那么它们的值对于给定的窗口将保持不变。因此,如果需要在leadlag 行之间比较columnAcolumnB,则DF 需要按其他列进行分区。 举例说明为什么它是问题

      val w = Window.partitionBy("ColumnA", "ColumnB").orderBy("ColumnA", "ColumnB");
      df.withColumn("rank", rank.over(w)).show
      +-------+--------+----+
      |ColumnA| ColumnB|rank|
      +-------+--------+----+
      | ABCDEF| EFGHIJK|   1|
      | ABCDEF|MNOPQRST|   1|
      | ABCDEF|MNOPQRST|   1|
      | 123455|  UVWXYZ|   1|
      | 123455|  UVWXYZ|   1|
      | 123455|  UVWXYZ|   1|
      +-------+--------+----+
      

      现在每一行都充当自己的窗口。注意顺序,在第2点解释。

    2. 窗口中还需要具体的order by 语句。如果没有rank,“滞后”、“领先”等将变得不确定,因此没有多大意义。 Spark 会尝试保护它,如果没有 order by 子句,窗口函数抛出异常。 举例说明为什么它是问题

      val w = Window.partitionBy("ColumnA", "ColumnB")
      df.withColumn("result", lag("columnB", 1).over(w))
      

      导致:

      org.apache.spark.sql.AnalysisException: Window function lag('columnB, 1, null) requires window to be ordered, please add ORDER BY clause. For example SELECT lag('columnB, 1, null)(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;
      

    解决方案 回答这个问题本身:我将为您的问题考虑另外两列。

    val df = sc.parallelize(List(("ABCDEF","MNOPQRST", "P1", "1"), 
                            ("123455","UVWXYZ", "P1", "2"),
                            ("ABCDEF","MNOPQRST", "P1", "3"),
                            ("123455","UVWXYZ", "P1", "4"),
                            ("123455","UVWXYZ", "P1", "5"), 
                            ("BLABLAH","UVWXYZ", "P1", "6"),
                            ("ABCDEF","EFGHIJK", "P1", "7")))
           .toDF("ColumnA","ColumnB", "ColumnP", "ColumnO")
    +-------+--------+-------+-------+
    |ColumnA| ColumnB|ColumnP|ColumnO|
    +-------+--------+-------+-------+
    | ABCDEF|MNOPQRST|     P1|      1|
    | 123455|  UVWXYZ|     P1|      2|
    | ABCDEF|MNOPQRST|     P1|      3|
    | 123455|  UVWXYZ|     P1|      4|
    | 123455|  UVWXYZ|     P1|      5|
    |BLABLAH|  UVWXYZ|     P1|      5|
    | ABCDEF| EFGHIJK|     P1|      6|
    +-------+--------+-------+-------+
    

    这里,分区列是columnP,按列排序是ColumnO

    val w = Window.partitionBy("ColumnP").orderBy("ColumnO")
    val dfWithWindowing = df.withColumn("lag_columnB", lag("columnB", 1).over(w))
                            .withColumn("rank", rank().over(w))
    dfWithWindowing.show
    +-------+--------+-------+-------+-----------+----+
    |ColumnA| ColumnB|ColumnP|ColumnO|lag_columnB|rank|
    +-------+--------+-------+-------+-----------+----+
    | ABCDEF|MNOPQRST|     P1|      1|       null|   1|
    | 123455|  UVWXYZ|     P1|      2|   MNOPQRST|   2|
    | ABCDEF|MNOPQRST|     P1|      3|     UVWXYZ|   3|
    | 123455|  UVWXYZ|     P1|      4|   MNOPQRST|   4|
    | 123455|  UVWXYZ|     P1|      5|     UVWXYZ|   5|
    |BLABLAH|  UVWXYZ|     P1|      6|     UVWXYZ|   6|
    | ABCDEF| EFGHIJK|     P1|      7|     UVWXYZ|   7|
    +-------+--------+-------+-------+-----------+----+
    

    现在我们拥有了执行所需计算所需的所有信息。当结果不满足任何条件时,规则中没有关于结果值的规范,实现认为这是真的。

    val resultDF = dfWithWindowing.withColumn("result", when($"rank"==="1",true).otherwise(
                                  when($"ColumnA"==="123455", false).otherwise(
                                        when($"ColumnB"===$"lag_columnB", true).otherwise(true)
                                     )
                                  )
                              ).drop("ColumnP", "ColumnO","lag_columnB","rank")
    +-------+--------+------+
    |ColumnA| ColumnB|result|
    +-------+--------+------+
    | ABCDEF|MNOPQRST|  true|
    | 123455|  UVWXYZ| false|
    | ABCDEF|MNOPQRST|  true|
    | 123455|  UVWXYZ| false|
    | 123455|  UVWXYZ| false|
    |BLABLAH|  UVWXYZ|  true|
    | ABCDEF| EFGHIJK|  true|
    +-------+--------+------+
    

    想了解更多开窗知识,请参考https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

    【讨论】:

      猜你喜欢
      • 2017-04-30
      • 1970-01-01
      • 2016-07-23
      • 2017-01-23
      • 1970-01-01
      • 2022-10-06
      • 1970-01-01
      • 2019-12-23
      • 1970-01-01
      相关资源
      最近更新 更多