【问题标题】:pyspark: rolling average using timeseries data filling zerospyspark:使用时间序列数据填充零的滚动平均值
【发布时间】:2019-11-18 23:28:04
【问题描述】:

我有一个数据集,其中包含几个星期的售罄数据。我想计算一个移动平均线,例如3 周,但考虑到没有销售的那几周。

让我们考虑以下数据:

|------|-------|
|wk_id |sellout|
|------|-------|
|201801|    1.0|
|201802|    5.0|
|201803|    3.0|
|201805|    1.0|
|201806|    5.0|
|------|-------|

我的预期结果是:

|------|-------|-------------|
|wk_id |sellout|moving_avg_3w|
|------|-------|-------------|
|201801|    1.0|0.333        | <- (0+0+1)/3
|201802|    5.0|2.000        | <- (0+1+5)/3
|201803|    3.0|3.000        | <- (1+5+3)/3
|201805|    1.0|1.333        | <- (3+0+1)/3
|201806|    5.0|2.000        | <- (5+1+0)/3
|------|-------|-------------|

我的天真的解决方案是,我用 0 填充缺失的几周,然后使用此处提供的方法:pyspark: rolling average using timeseries data

但如果有大量数据,这似乎不是最高效的方法。谁有更好的解决方案?

这个问题是关于 PySpark 的

【问题讨论】:

  • 不添加缺失的行,我看不到任何可能性。
  • 如果一周不缺,你是否总是每周只有一行?
  • 是的,每周只有一行

标签: apache-spark pyspark window-functions moving-average


【解决方案1】:

因此,将“wk_id”更改为unix_timestamp 后,您实际上可以在rangeBetween 发布的链接中使用window 中的方法,以便在几周之间获得足够的空间。

import pyspark.sql.functions as F
from pyspark.sql.window import Window

# create the df: some wk_id are different to see it works when you change year as well
df = spark.createDataFrame( [ (201801, 1.0), (201802, 5.0), (201804,3.0), 
                              (201851, 3.0), (201852, 1.0), (201901,5.0)], 
                            ['wk_id','sellout'])
# nb_wk you want to roll over
nb_wk = 3

# function to calculate the number of seconds from the number of weeks
wk_to_sec = lambda i: i * 7 * 86400

# create the window of nb_wk
w = Window().orderBy(F.col("sec")).rangeBetween(-wk_to_sec(nb_wk-1), 0)

# add the columns of the number of seconds then the moving average by a sum divide by nb_wk
# the method mean does not work here as there are missing weeks
df = df.withColumn( 'sec', F.unix_timestamp(F.col('wk_id').cast('string'), format="YYYYww"))\
       .withColumn( 'moving_avg_{}w'.format(nb_wk), F.sum('sellout').over(w)/nb_wk)

df.show()
+------+-------+----------+------------------+
| wk_id|sellout|       sec|     moving_avg_3w|
+------+-------+----------+------------------+
|201801|    1.0|1514696400|0.3333333333333333|
|201802|    5.0|1515301200|               2.0|
|201804|    3.0|1516510800|2.6666666666666665| # here it is (5+0+3)/3
|201851|    3.0|1544936400|               1.0|
|201852|    1.0|1545541200|1.3333333333333333|
|201901|    5.0|1546146000|               3.0|  # here it is (3+1+5)/3
+------+-------+----------+------------------+

您可以在之后删除“秒”列,或者如果您不想创建此列,您可以一次完成所有操作:

# create the window of nb_wk with unix_timestamp directly in it
w = Window().orderBy(F.unix_timestamp(F.col('wk_id').cast('string'), format="YYYYww"))
            .rangeBetween(-wk_to_sec(nb_wk-1), 0)
df = df.withColumn( 'moving_avg_{}w'.format(nb_wk), F.sum('sellout').over(w)/nb_wk)

编辑:对于移动标准偏差,我认为您可以这样做,但不确定性能:

df = df.withColumn('std', F.sqrt( (F.sum( (F.col('sellout') - F.last('roll_mean_3w').over(w))**2).over(w) 
                                   + (nb_wk - F.count('sellout').over(w))*F.last('roll_mean_3w').over(w)**2)
                                   /nb_wk)) 

【讨论】:

  • 好的,提出我的问题:我不能应用这种方法来计算标准差,或者?
  • 完全正确。移动平均和移动标准差。
猜你喜欢
  • 2018-01-30
  • 1970-01-01
  • 2016-10-11
  • 2017-08-03
  • 1970-01-01
  • 1970-01-01
  • 2018-09-23
  • 1970-01-01
  • 2021-11-01
相关资源
最近更新 更多