【问题标题】:pyspark: rolling average using Custom timeseries datapyspark:使用自定义时间序列数据的滚动平均值
【发布时间】:2020-01-21 05:32:37
【问题描述】:

嗨,我的基础数据框是这样的。

'|stockId|timeStamp|stockPrice|'
+-------+---------+----------+
|    101|        1|      53.0|
|    101|        2|      15.0|
|    101|        3|      57.0|
|    101|        4|      71.0|
|    101|        5|      86.0|

这是我的代码,它转换 days.followed by window 和 average window 。

days=lambda i:i*86400
W=Window.partitionBy(F.col('stockId')).orderBy(F.col('epoch_time').cast("timestamp").cast("long")).rangeBetween(-days(3),0)
Df=.withColumn("current_timestamp",F.unix_timestamp(F.lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))\
.withColumn("epoch",F.unix_timestamp("current_timestamp"))\
.withColumn("epoch_time",F.concat(F.col("epoch")+F.col("timeStamp")))\
.withColumn("moving_avg",F.avg("stockPrice").over(W))

这是我的结果。

+-------+---------+----------+-------------------+----------+----------+-----------------+
|stockId|timeStamp|stockPrice|  current_timestamp|     epoch|epoch_time|       moving_avg|
+-------+---------+----------+-------------------+----------+----------+-----------------+
|    101|        1|      53.0|2020-01-21 10:53:43|1579584223|1579584224|48.21782178217822|
|    101|        2|      15.0|2020-01-21 10:53:43|1579584223|1579584225|48.21782178217822|
|    101|        3|      57.0|2020-01-21 10:53:43|1579584223|1579584226|48.21782178217822|
|    101|        4|      86.0|2020-01-21 10:53:43|1579584223|1579584227|48.21782178217822|

预期输出

+-------+---------+----------+-------------------+----------+----------+-----------------+
|stockId|timeStamp|stockPrice|  current_timestamp|     epoch|epoch_time|       moving_avg|
+-------+---------+----------+-------------------+----------+----------+-----------------+
|    101|        3|      57.0|2020-01-21 10:53:43|1579584223|1579584226|41.67|
|    101|        4|      71.0|2020-01-21 10:53:43|1579584223|1579584227|47.67|
|    101|        5|      71.0|2020-01-21 10:53:43|1579584223|1579584227|71.33|

【问题讨论】:

  • 我的天数是多少?
  • 它是一个 lambda 函数,可以转换为秒 24*60*60
  • 谢谢你,你在什么基础上不需要前两行?
  • 为了更好地理解我一直这样。但滚动窗口超过 3 天。

标签: apache-spark pyspark apache-spark-sql spark-streaming pyspark-sql


【解决方案1】:
W=Window.partitionBy(F.col('stockId')).orderBy(F.col('epoch_time').cast("timestamp").cast("long")).rangeBetween(-2,0)

+-------+---------+----------+-------------------+----------+----------+------
|stockId|timeStamp|stockPrice|  current_timestamp|     epoch|epoch_time|       moving_avg|
+-------+---------+----------+-------------------+----------+----------+-----------------+
|    101|        3|      57.0|2020-01-21 10:53:43|1579584223|1579584226|41.67|
|    101|        4|      71.0|2020-01-21 10:53:43|1579584223|1579584227|47.67|
|    101|        5|      71.0|2020-01-21 10:53:43|1579584223|1579584227|71.33|

保持滚动后window rangeBetween(-2,0) 平均值从current row3rd row of the table

【讨论】:

    猜你喜欢
    • 2018-01-30
    • 2019-11-18
    • 2017-08-03
    • 1970-01-01
    • 2018-09-23
    • 1970-01-01
    • 2020-03-12
    • 2014-02-17
    • 1970-01-01
    相关资源
    最近更新 更多