【发布时间】:2018-01-30 01:33:17
【问题描述】:
我有一个由时间戳列和美元列组成的数据集。我想找到以每行时间戳结束的每周平均美元数。我最初查看的是 pyspark.sql.functions.window 函数,但它按周对数据进行分类。
这是一个例子:
%pyspark
import datetime
from pyspark.sql import functions as F
df1 = sc.parallelize([(17,"2017-03-11T15:27:18+00:00"), (13,"2017-03-11T12:27:18+00:00"), (21,"2017-03-17T11:27:18+00:00")]).toDF(["dollars", "datestring"])
df2 = df1.withColumn('timestampGMT', df1.datestring.cast('timestamp'))
w = df2.groupBy(F.window("timestampGMT", "7 days")).agg(F.avg("dollars").alias('avg'))
w.select(w.window.start.cast("string").alias("start"), w.window.end.cast("string").alias("end"), "avg").collect()
这会产生两条记录:
| start | end | avg |
|---------------------|----------------------|-----|
|'2017-03-16 00:00:00'| '2017-03-23 00:00:00'| 21.0|
|---------------------|----------------------|-----|
|'2017-03-09 00:00:00'| '2017-03-16 00:00:00'| 15.0|
|---------------------|----------------------|-----|
窗口函数将时间序列数据分箱,而不是执行滚动平均。
有没有一种方法可以执行滚动平均,我将获得每行的每周平均值,时间段以该行的时间戳 GMT 结束?
编辑:
下面张的回答接近我想要的,但不是我想看到的。
这里有一个更好的例子来展示我想要达到的目的:
%pyspark
from pyspark.sql import functions as F
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"),
(13, "2017-03-15T12:27:18+00:00"),
(25, "2017-03-18T11:27:18+00:00")],
["dollars", "timestampGMT"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
df = df.withColumn('rolling_average', F.avg("dollars").over(Window.partitionBy(F.window("timestampGMT", "7 days"))))
这会产生以下数据框:
dollars timestampGMT rolling_average
25 2017-03-18 11:27:18.0 25
17 2017-03-10 15:27:18.0 15
13 2017-03-15 12:27:18.0 15
我希望在 timestampGMT 列中的日期前一周的平均值,这将导致:
dollars timestampGMT rolling_average
17 2017-03-10 15:27:18.0 17
13 2017-03-15 12:27:18.0 15
25 2017-03-18 11:27:18.0 19
在上述结果中,2017-03-10 的 rolling_average 为 17,因为没有之前的记录。 2017 年 3 月 15 日的 rolling_average 为 15,因为它是 2017 年 3 月 15 日的 13 和 2017 年 3 月 10 日的 17 的平均值,落在前 7 天的窗口内。 2017 年 3 月 18 日的滚动平均值为 19,因为它是 2017 年 3 月 18 日的 25 和 2017 年 3 月 10 日的 13 的平均值,该平均值落在前 7 天窗口内,不包括 2017 年的 17 -03-10,因为这不属于前 7 天的窗口。
有没有办法做到这一点,而不是每周窗口不重叠的分箱窗口?
【问题讨论】:
标签: apache-spark pyspark window-functions moving-average