【发布时间】:2020-03-12 07:50:09
【问题描述】:
我有一个 PySpark 数据框,其中时间戳以天为单位。以下是数据框的示例(我们称之为df):
+-----+-----+----------+-----+
| name| type| timestamp|score|
+-----+-----+----------+-----+
|name1|type1|2012-01-10| 11|
|name1|type1|2012-01-11| 14|
|name1|type1|2012-01-12| 2|
|name1|type3|2012-01-12| 3|
|name1|type3|2012-01-11| 55|
|name1|type1|2012-01-13| 10|
|name1|type2|2012-01-14| 11|
|name1|type2|2012-01-15| 14|
|name2|type2|2012-01-10| 2|
|name2|type2|2012-01-11| 3|
|name2|type2|2012-01-12| 55|
|name2|type1|2012-01-10| 10|
|name2|type1|2012-01-13| 55|
|name2|type1|2012-01-14| 10|
+-----+-----+----------+-----+
在这个数据框中,我想在三天的滚动时间窗口内取不同名称的分数总和。意思是,对于数据框的任何给定日期,对于 name1 ,查找当天、考虑日期前一天和考虑日期前一天前一天的分数总和。并为name1 的所有日子做类似的事情。并对各种names、viz. name2 等做同样的练习。我该怎么做?
我看了this的帖子,尝试了以下
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
days = lambda i: i*1
w_rolling = Window.orderBy(F.col("timestamp").cast("long")).rangeBetween(-days(3), 0)
df_agg = df.withColumn("rolling_average", F.avg("score").over(w_rolling)).withColumn(
"rolling_sum", F.sum("score").over(w_rolling)
)
df_agg.show()
+-----+-----+----------+-----+------------------+-----------+
| name| type| timestamp|score| rolling_average|rolling_sum|
+-----+-----+----------+-----+------------------+-----------+
|name1|type1|2012-01-10| 11|18.214285714285715| 255|
|name1|type1|2012-01-11| 14|18.214285714285715| 255|
|name1|type1|2012-01-12| 2|18.214285714285715| 255|
|name1|type3|2012-01-12| 3|18.214285714285715| 255|
|name1|type3|2012-01-11| 55|18.214285714285715| 255|
|name1|type1|2012-01-13| 10|18.214285714285715| 255|
|name1|type2|2012-01-14| 11|18.214285714285715| 255|
|name1|type2|2012-01-15| 14|18.214285714285715| 255|
|name2|type2|2012-01-10| 2|18.214285714285715| 255|
|name2|type2|2012-01-11| 3|18.214285714285715| 255|
|name2|type2|2012-01-12| 55|18.214285714285715| 255|
|name2|type1|2012-01-10| 10|18.214285714285715| 255|
|name2|type1|2012-01-13| 55|18.214285714285715| 255|
|name2|type1|2012-01-14| 10|18.214285714285715| 255|
+-----+-----+----------+-----+------------------+-----------+
如您所见,我总是得到相同的滚动平均值和滚动总和,这不过是 score 列的平均值和总和。这不是我想要的。
您可以使用以下代码 sn-p 创建上述数据框:
df_Stats = Row("name", "type", "timestamp", "score")
df_stat1 = df_Stats("name1", "type1", "2012-01-10", 11)
df_stat2 = df_Stats("name1", "type1", "2012-01-11", 14)
df_stat3 = df_Stats("name1", "type1", "2012-01-12", 2)
df_stat4 = df_Stats("name1", "type3", "2012-01-12", 3)
df_stat5 = df_Stats("name1", "type3", "2012-01-11", 55)
df_stat6 = df_Stats("name1", "type1", "2012-01-13", 10)
df_stat7 = df_Stats("name1", "type2", "2012-01-14", 11)
df_stat8 = df_Stats("name1", "type2", "2012-01-15", 14)
df_stat9 = df_Stats("name2", "type2", "2012-01-10", 2)
df_stat10 = df_Stats("name2", "type2", "2012-01-11", 3)
df_stat11 = df_Stats("name2", "type2", "2012-01-12", 55)
df_stat12 = df_Stats("name2", "type1", "2012-01-10", 10)
df_stat13 = df_Stats("name2", "type1", "2012-01-13", 55)
df_stat14 = df_Stats("name2", "type1", "2012-01-14", 10)
df_stat_lst = [
df_stat1,
df_stat2,
df_stat3,
df_stat4,
df_stat5,
df_stat6,
df_stat7,
df_stat8,
df_stat9,
df_stat10,
df_stat11,
df_stat12,
df_stat13,
df_stat14
]
df = spark.createDataFrame(df_stat_lst)
【问题讨论】:
标签: python pandas pyspark pyspark-sql pyspark-dataframes