【发布时间】:2017-07-15 20:54:44
【问题描述】:
这可能最容易通过示例来解释。假设我有一个用户登录网站的 DataFrame,例如:
scala> df.show(5)
+----------------+----------+
| user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
| OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows
我想在此添加一列,指示他们何时成为网站上的活跃用户。但是有一个警告:在一段时间内用户被认为是活跃的,在这段时间之后,如果他们再次登录,他们的became_active 日期会重置。假设这段时间是 5 天。那么从上表导出的所需表将是这样的:
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-11| 2012-01-11|
+----------------+----------+-------------+
因此,特别是,SirChillingtonIV 的 became_active 日期被重置,因为他们的第二次登录是在活跃期到期后进行的,但是 Booooooo99900098 的 became_active 日期在他/她第二次登录时没有重置,因为它属于活跃期时期。
我最初的想法是使用带有lag 的窗口函数,然后使用lagged 值填充became_active 列;例如,开始大致如下:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))
那么,填写became_active 日期的规则是,如果tmp 是null(即,如果它是第一次登录),或者如果login_date - tmp >= 5 那么became_active = login_date;否则,转到tmp 中的下一个最新值并应用相同的规则。这表明了一种递归方法,我无法想象一种实现方法。
我的问题:这是一种可行的方法吗?如果是,我怎样才能“返回”并查看 tmp 的早期值,直到找到一个我停下来的地方?据我所知,我无法遍历 Spark SQL Column 的值。有没有其他方法可以达到这个结果?
【问题讨论】:
标签: sql apache-spark pyspark apache-spark-sql window-functions