【问题标题】:Spark SQL window function with complex condition具有复杂条件的 Spark SQL 窗口函数
【发布时间】:2017-07-15 20:54:44
【问题描述】:

这可能最容易通过示例来解释。假设我有一个用户登录网站的 DataFrame,例如:

scala> df.show(5)
+----------------+----------+
|       user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
|  OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows

我想在此添加一列,指示他们何时成为网站上的活跃用户。但是有一个警告:在一段时间内用户被认为是活跃的,在这段时间之后,如果他们再次登录,他们的became_active 日期会重置。假设这段时间是 5 天。那么从上表导出的所需表将是这样的:

+----------------+----------+-------------+
|       user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-06|   2012-01-04|
|  OprahWinfreyJr|2012-01-10|   2012-01-10|
|SirChillingtonIV|2012-01-11|   2012-01-11|
+----------------+----------+-------------+

因此,特别是,SirChillingtonIV 的 became_active 日期被重置,因为他们的第二次登录是在活跃期到期后进行的,但是 Booooooo99900098 的 became_active 日期在他/她第二次登录时没有重置,因为它属于活跃期时期。

我最初的想法是使用带有lag 的窗口函数,然后使用lagged 值填充became_active 列;例如,开始大致如下:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))

那么,填写became_active 日期的规则是,如果tmpnull(即,如果它是第一次登录),或者如果login_date - tmp >= 5 那么became_active = login_date;否则,转到tmp 中的下一个最新值并应用相同的规则。这表明了一种递归方法,我无法想象一种实现方法。

我的问题:这是一种可行的方法吗?如果是,我怎样才能“返回”并查看 tmp 的早期值,直到找到一个我停下来的地方?据我所知,我无法遍历 Spark SQL Column 的值。有没有其他方法可以达到这个结果?

【问题讨论】:

    标签: sql apache-spark pyspark apache-spark-sql window-functions


    【解决方案1】:

    火花 >= 3.2

    最近的 Spark 版本为批处理和结构化流式查询中的会话窗口提供原生支持(请参阅 SPARK-10816 及其子任务,尤其是 SPARK-34893)。

    官方文档提供了不错的usage example

    火花

    这是诀窍。导入一堆函数:

    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}
    

    定义窗口:

    val userWindow = Window.partitionBy("user_name").orderBy("login_date")
    val userSessionWindow = Window.partitionBy("user_name", "session")
    

    找到新会话的开始点:

    val newSession =  (coalesce(
      datediff($"login_date", lag($"login_date", 1).over(userWindow)),
      lit(0)
    ) > 5).cast("bigint")
    
    val sessionized = df.withColumn("session", sum(newSession).over(userWindow))
    

    查找每个会话的最早日期:

    val result = sessionized
      .withColumn("became_active", min($"login_date").over(userSessionWindow))
      .drop("session")
    

    数据集定义为:

    val df = Seq(
      ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
      ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"), 
      ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
      ("SirChillingtonIV", "2012-08-11")
    ).toDF("user_name", "login_date")
    

    结果是:

    +----------------+----------+-------------+
    |       user_name|login_date|became_active|
    +----------------+----------+-------------+
    |  OprahWinfreyJr|2012-01-10|   2012-01-10|
    |SirChillingtonIV|2012-01-04|   2012-01-04| <- The first session for user
    |SirChillingtonIV|2012-01-11|   2012-01-11| <- The second session for user
    |SirChillingtonIV|2012-01-14|   2012-01-11| 
    |SirChillingtonIV|2012-08-11|   2012-08-11| <- The third session for user
    |Booooooo99900098|2012-01-04|   2012-01-04|
    |Booooooo99900098|2012-01-06|   2012-01-04|
    +----------------+----------+-------------+
    

    【讨论】:

    • 我知道已经很久了,但你能帮我理解解决方案的合并部分吗??
    • @SanchitGrover 如果datediff($"login_date", lag($"login_date", 1).over(userWindow)) 的计算结果为null(帧中的第一行),则为 0。
    • 那么这个val sessionized = df.withColumn("session", sum(newSession).over(userWindow))是怎么增加计数的呢?
    • 集合 {0, 1} 中值的累积和。
    【解决方案2】:

    重构 the other answer 以使用 Pyspark

    Pyspark 你可以像下面这样。

    create data frame

    df = sqlContext.createDataFrame(
    [
    ("SirChillingtonIV", "2012-01-04"), 
    ("Booooooo99900098", "2012-01-04"), 
    ("Booooooo99900098", "2012-01-06"), 
    ("OprahWinfreyJr", "2012-01-10"), 
    ("SirChillingtonIV", "2012-01-11"), 
    ("SirChillingtonIV", "2012-01-14"), 
    ("SirChillingtonIV", "2012-08-11")
    ], 
    ("user_name", "login_date"))
    

    上面的代码创建了一个如下所示的数据框

    +----------------+----------+
    |       user_name|login_date|
    +----------------+----------+
    |SirChillingtonIV|2012-01-04|
    |Booooooo99900098|2012-01-04|
    |Booooooo99900098|2012-01-06|
    |  OprahWinfreyJr|2012-01-10|
    |SirChillingtonIV|2012-01-11|
    |SirChillingtonIV|2012-01-14|
    |SirChillingtonIV|2012-08-11|
    +----------------+----------+
    

    现在我们要先找出login_date 之间的差异大于5 天数。

    为此,请执行以下操作。

    必要的进口

    from pyspark.sql import functions as f
    from pyspark.sql import Window
    
    
    # defining window partitions  
    login_window = Window.partitionBy("user_name").orderBy("login_date")
    session_window = Window.partitionBy("user_name", "session")
    
    session_df = df.withColumn("session", f.sum((f.coalesce(f.datediff("login_date", f.lag("login_date", 1).over(login_window)), f.lit(0)) > 5).cast("int")).over(login_window))
    

    当我们运行上述代码行时,如果date_diffNULL,那么coalesce 函数会将NULL 替换为0

    +----------------+----------+-------+
    |       user_name|login_date|session|
    +----------------+----------+-------+
    |  OprahWinfreyJr|2012-01-10|      0|
    |SirChillingtonIV|2012-01-04|      0|
    |SirChillingtonIV|2012-01-11|      1|
    |SirChillingtonIV|2012-01-14|      1|
    |SirChillingtonIV|2012-08-11|      2|
    |Booooooo99900098|2012-01-04|      0|
    |Booooooo99900098|2012-01-06|      0|
    +----------------+----------+-------+
    
    
    # add became_active column by finding the `min login_date` for each window partitionBy `user_name` and `session` created in above step
    final_df = session_df.withColumn("became_active", f.min("login_date").over(session_window)).drop("session")
    
    +----------------+----------+-------------+
    |       user_name|login_date|became_active|
    +----------------+----------+-------------+
    |  OprahWinfreyJr|2012-01-10|   2012-01-10|
    |SirChillingtonIV|2012-01-04|   2012-01-04|
    |SirChillingtonIV|2012-01-11|   2012-01-11|
    |SirChillingtonIV|2012-01-14|   2012-01-11|
    |SirChillingtonIV|2012-08-11|   2012-08-11|
    |Booooooo99900098|2012-01-04|   2012-01-04|
    |Booooooo99900098|2012-01-06|   2012-01-04|
    +----------------+----------+-------------+
    

    【讨论】:

      猜你喜欢
      • 2018-01-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-01-12
      • 2016-02-11
      • 1970-01-01
      相关资源
      最近更新 更多