【问题标题】:pyspark: rolling average using timeseries datapyspark:使用时间序列数据的滚动平均值
【发布时间】:2018-01-30 01:33:17
【问题描述】:

我有一个由时间戳列和美元列组成的数据集。我想找到以每行时间戳结束的每周平均美元数。我最初查看的是 pyspark.sql.functions.window 函数,但它按周对数据进行分类。

这是一个例子:

%pyspark
import datetime
from pyspark.sql import functions as F

df1 = sc.parallelize([(17,"2017-03-11T15:27:18+00:00"), (13,"2017-03-11T12:27:18+00:00"), (21,"2017-03-17T11:27:18+00:00")]).toDF(["dollars", "datestring"])
df2 = df1.withColumn('timestampGMT', df1.datestring.cast('timestamp'))

w = df2.groupBy(F.window("timestampGMT", "7 days")).agg(F.avg("dollars").alias('avg'))
w.select(w.window.start.cast("string").alias("start"), w.window.end.cast("string").alias("end"), "avg").collect()

这会产生两条记录:

|        start        |          end         | avg |
|---------------------|----------------------|-----|
|'2017-03-16 00:00:00'| '2017-03-23 00:00:00'| 21.0|
|---------------------|----------------------|-----|
|'2017-03-09 00:00:00'| '2017-03-16 00:00:00'| 15.0|
|---------------------|----------------------|-----|

窗口函数将时间序列数据分箱,而不是执行滚动平均。

有没有一种方法可以执行滚动平均,我将获得每行的每周平均值,时间段以该行的时间戳 GMT 结束?

编辑:

下面张的回答接近我想要的,但不是我想看到的。

这里有一个更好的例子来展示我想要达到的目的:

%pyspark
from pyspark.sql import functions as F
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"),
                        (13, "2017-03-15T12:27:18+00:00"),
                        (25, "2017-03-18T11:27:18+00:00")],
                        ["dollars", "timestampGMT"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
df = df.withColumn('rolling_average', F.avg("dollars").over(Window.partitionBy(F.window("timestampGMT", "7 days"))))

这会产生以下数据框:

dollars timestampGMT            rolling_average
25      2017-03-18 11:27:18.0   25
17      2017-03-10 15:27:18.0   15
13      2017-03-15 12:27:18.0   15

我希望在 timestampGMT 列中的日期前一周的平均值,这将导致:

dollars timestampGMT            rolling_average
17      2017-03-10 15:27:18.0   17
13      2017-03-15 12:27:18.0   15
25      2017-03-18 11:27:18.0   19

在上述结果中,2017-03-10 的 rolling_average 为 17,因为没有之前的记录。 2017 年 3 月 15 日的 rolling_average 为 15,因为它是 2017 年 3 月 15 日的 13 和 2017 年 3 月 10 日的 17 的平均值,落在前 7 天的窗口内。 2017 年 3 月 18 日的滚动平均值为 19,因为它是 2017 年 3 月 18 日的 25 和 2017 年 3 月 10 日的 13 的平均值,该平均值落在前 7 天窗口内,不包括 2017 年的 17 -03-10,因为这不属于前 7 天的窗口。

有没有办法做到这一点,而不是每周窗口不重叠的分箱窗口?

【问题讨论】:

    标签: apache-spark pyspark window-functions moving-average


    【解决方案1】:

    我找到了使用这个 stackoverflow 计算移动/滚动平均值的正确方法:

    Spark Window Functions - rangeBetween dates

    基本思想是将时间戳列转换为秒,然后可以使用 pyspark.sql.Window 类中的 rangeBetween 函数在窗口中包含正确的行。

    这是解决的例子:

    %pyspark
    from pyspark.sql import functions as F
    from pyspark.sql.window import Window
    
    
    #function to calculate number of seconds from number of days
    days = lambda i: i * 86400
    
    df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"),
                            (13, "2017-03-15T12:27:18+00:00"),
                            (25, "2017-03-18T11:27:18+00:00")],
                            ["dollars", "timestampGMT"])
    df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
    
    #create window by casting timestamp to long (number of seconds)
    w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))
    
    df = df.withColumn('rolling_average', F.avg("dollars").over(w))
    

    这会得到我正在寻找的滚动平均值的确切列:

    dollars   timestampGMT            rolling_average
    17        2017-03-10 15:27:18.0   17.0
    13        2017-03-15 12:27:18.0   15.0
    25        2017-03-18 11:27:18.0   19.0
    

    【讨论】:

    • 如果你有一个完整的连续日期列,那么你可以使用rowsBetween(-7,0)
    • 这使用了window 函数,该函数将数据框强制为单个节点。如果它是一个非常大的数据框,则会遇到内存问题。有没有办法使用 rangeBetween 但利用 spark 数据帧的分布式计算?
    • 来自下面的文档rangeBetween(start, end) 创建一个 WindowSpec,定义了从开始(包含)到结束(包含)的框架边界。所以在上面的代码中rangeBetween(-days(7), 0))应该是rangeBetween(-days(7)+1, 0))link
    【解决方案2】:

    我将添加一个我个人认为非常有用的变体。我希望有人会发现它也很有用:

    如果要分组,则在各自的组内计算移动平均值:

    数据框示例:

    from pyspark.sql.window import Window
    from pyspark.sql import functions as func
    
    
    df = spark.createDataFrame([("tshilidzi", 17.00, "2018-03-10T15:27:18+00:00"), 
      ("tshilidzi", 13.00, "2018-03-11T12:27:18+00:00"),   
      ("tshilidzi", 25.00, "2018-03-12T11:27:18+00:00"), 
      ("thabo", 20.00, "2018-03-13T15:27:18+00:00"), 
      ("thabo", 56.00, "2018-03-14T12:27:18+00:00"), 
      ("thabo", 99.00, "2018-03-15T11:27:18+00:00"), 
      ("tshilidzi", 156.00, "2019-03-22T11:27:18+00:00"), 
      ("thabo", 122.00, "2018-03-31T11:27:18+00:00"), 
      ("tshilidzi", 7000.00, "2019-04-15T11:27:18+00:00"),
      ("ash", 9999.00, "2018-04-16T11:27:18+00:00") 
      ],
      ["name", "dollars", "timestampGMT"])
    
    # we need this timestampGMT as seconds for our Window time frame
    df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
    
    df.show(10000, False)
    

    输出:

    +---------+-------+---------------------+
    |name     |dollars|timestampGMT         |
    +---------+-------+---------------------+
    |tshilidzi|17.0   |2018-03-10 17:27:18.0|
    |tshilidzi|13.0   |2018-03-11 14:27:18.0|
    |tshilidzi|25.0   |2018-03-12 13:27:18.0|
    |thabo    |20.0   |2018-03-13 17:27:18.0|
    |thabo    |56.0   |2018-03-14 14:27:18.0|
    |thabo    |99.0   |2018-03-15 13:27:18.0|
    |tshilidzi|156.0  |2019-03-22 13:27:18.0|
    |thabo    |122.0  |2018-03-31 13:27:18.0|
    |tshilidzi|7000.0 |2019-04-15 13:27:18.0|
    |ash      |9999.0 |2018-04-16 13:27:18.0|
    +---------+-------+---------------------+
    

    要根据name 计算移动平均值并仍然保持所有行:

    #create window by casting timestamp to long (number of seconds)
    w = (Window()
         .partitionBy(col("name"))
         .orderBy(F.col("timestampGMT").cast('long'))
         .rangeBetween(-days(7), 0))
    
    df2 = df.withColumn('rolling_average', F.avg("dollars").over(w))
    
    df2.show(100, False)
    

    输出:

    +---------+-------+---------------------+------------------+
    |name     |dollars|timestampGMT         |rolling_average   |
    +---------+-------+---------------------+------------------+
    |ash      |9999.0 |2018-04-16 13:27:18.0|9999.0            |
    |tshilidzi|17.0   |2018-03-10 17:27:18.0|17.0              |
    |tshilidzi|13.0   |2018-03-11 14:27:18.0|15.0              |
    |tshilidzi|25.0   |2018-03-12 13:27:18.0|18.333333333333332|
    |tshilidzi|156.0  |2019-03-22 13:27:18.0|156.0             |
    |tshilidzi|7000.0 |2019-04-15 13:27:18.0|7000.0            |
    |thabo    |20.0   |2018-03-13 17:27:18.0|20.0              |
    |thabo    |56.0   |2018-03-14 14:27:18.0|38.0              |
    |thabo    |99.0   |2018-03-15 13:27:18.0|58.333333333333336|
    |thabo    |122.0  |2018-03-31 13:27:18.0|122.0             |
    +---------+-------+---------------------+------------------+
    

    【讨论】:

      【解决方案3】:

      值得注意的是,如果您不关心确切的日期 - 但关心最近 30 天的平均值,您可以使用 rowsBetween 函数,如下所示:

      w = Window.orderBy('timestampGMT').rowsBetween(-7, 0)
      
      df = eurPrices.withColumn('rolling_average', F.avg('dollars').over(w))
      

      由于您按日期排序,因此需要最后 7 次出现。 你保存所有的演员表。

      【讨论】:

        【解决方案4】:

        你是这个意思吗:

        df = spark.createDataFrame([(17, "2017-03-11T15:27:18+00:00"),
                                    (13, "2017-03-11T12:27:18+00:00"),
                                    (21, "2017-03-17T11:27:18+00:00")],
                                   ["dollars", "timestampGMT"])
        df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
        df = df.withColumn('rolling_average', f.avg("dollars").over(Window.partitionBy(f.window("timestampGMT", "7 days"))))
        

        输出:

        +-------+-------------------+---------------+                                   
        |dollars|timestampGMT       |rolling_average|
        +-------+-------------------+---------------+
        |21     |2017-03-17 19:27:18|21.0           |
        |17     |2017-03-11 23:27:18|15.0           |
        |13     |2017-03-11 20:27:18|15.0           |
        +-------+-------------------+---------------+
        

        【讨论】:

        • 谢谢张,这更接近我想要的,但不完全是我想要的。您的代码仍在通过日期分箱计算答案。我希望每个每周平均值都在行中的日期结束。没有做一个很好的例子是我的错。我将使用更新的示例来编辑我的帖子,显示我想要的内容。
        猜你喜欢
        • 2019-11-18
        • 1970-01-01
        • 2017-08-03
        • 1970-01-01
        • 2018-09-23
        • 2020-03-12
        • 2014-02-17
        • 1970-01-01
        • 2020-09-11
        相关资源
        最近更新 更多