【问题标题】:pySpark, aggregate complex function (difference of consecutive events)pySpark,聚合复杂函数(连续事件的差异)
【发布时间】:2016-12-09 17:22:49
【问题描述】:

我有一个 DataFrame (df),其列是 userid(用户 ID)、day(当天)。

我有兴趣为每个用户计算他/她每天活跃的平均时间间隔。

例如,对于给定的用户,DataFrame 可能看起来像这样

userid       day      
1          2016-09-18        
1          2016-09-20
1          2016-09-25    

如果 DataFrame 是 Pandas DataFrame,我可以像这样计算我感兴趣的数量

import numpy as np
np.mean(np.diff(df[df.userid==1].day))

但是,这非常低效,因为我在 DataFrame 中有数百万用户,但我相信可以这样做

df.groupby("userid").agg({"day": lambda x: np.mean(np.diff(x))})

第一个问题是我不确定这是否能正常工作,因为在应用 np.mean(np.diff(x)) 之前需要对日期进行排序。

相反,第二个问题是效率低下,因为我只能在将 DataFrame 转换为 Pandas DataFrame 时这样做。

有没有办法用 pySpark 做同样的事情?

【问题讨论】:

    标签: python pandas apache-spark pyspark


    【解决方案1】:

    窗口函数来拯救。一些进口:

    from pyspark.sql.functions import col, datediff, lag
    from pyspark.sql.window import Window
    

    窗口定义

    w = Window().partitionBy("userid").orderBy("day")
    

    并查询

    (df
        .withColumn("diff", datediff(lag("day", 1).over(w), "day"))
        .groupBy("userid")
        .mean("diff"))
    

    【讨论】:

      猜你喜欢
      • 2012-04-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-11-23
      • 1970-01-01
      • 2019-03-14
      • 2019-05-30
      相关资源
      最近更新 更多