【问题标题】:pyspark calculating moving average of the difference between current time and last active timepyspark计算当前时间和上次活动时间之间差异的移动平均值
【发布时间】:2019-02-15 19:39:23
【问题描述】:

我有一些这样的记录。

A    B
1    2018-12-25
2    2019-01-15
1    2019-01-20
3    2018-01-01
2    2019-01-01
4    2018-04-09
3    2018-11-08
1    2018-03-20

我想要得到的是这样的东西。 第一步,在组内按升序排列。 (不需要A下单)

A    B
1    2018-03-20
1    2018-12-25
1    2019-01-20
3    2018-01-01
3    2018-11-08
2    2019-01-01
2    2019-01-15
4    2018-04-09

第二步,获取组内连续行之间的时间差。

A    B            C
1    2018-03-20   NaN
1    2018-12-25   280
1    2019-01-20   26
3    2018-01-01   NaN
3    2018-11-08   311
2    2019-01-01   NaN
2    2019-01-15   14
4    2018-04-09   NaN

第三步,得到窗口大小为 2 的 C 的移动平均值。(因为我只提供了很少的行作为例子,为了方便,就选择大小 2)

A    B            C     moving_avg
1    2018-03-20   NaN   NaN
1    2018-12-25   280   280
1    2019-01-20   26    153
3    2018-01-01   NaN   NaN
3    2018-11-08   311   311
2    2019-01-01   NaN   NaN
2    2019-01-15   14    14
4    2018-04-09   NaN   NaN

如果 Windows 函数可以处理这种情况,该解决方案实际上不需要生成 C 列。我列出每个步骤只是为了确保您可以清楚地了解问题所在。

结果集如下所示

A    B            moving_avg
1    2018-03-20   NaN
1    2018-12-25   280
1    2019-01-20   153
3    2018-01-01   NaN
3    2018-11-08   311
2    2019-01-01   NaN
2    2019-01-15   14
4    2018-04-09   NaN

注意:这是在 pyspark 上并使用数据框。不在 Python 上使用 Pandas。

非常感谢!

【问题讨论】:

    标签: python pyspark moving-average


    【解决方案1】:

    文档: windows

    文档: lag

    # Creating a Dataframe
    from pyspark.sql.window import Window
    from pyspark.sql.functions import col, to_date, lag, datediff, when, udf
    df = sqlContext.createDataFrame([(1,'2018-12-25'),(2,'2019-01-15'),(1,'2019-01-20'),(3,'2018-01-01'),
                                     (2,'2019-01-01'),(4,'2018-04-09'),(3,'2018-11-08'),(1,'2018-03-20')],
                                     ['A','B'])
    df = df.withColumn('B',to_date(col('B'), 'yyyy-MM-dd'))
    
    # Using window and lag functions to find the value from previous row
    my_window = Window.partitionBy('A').orderBy('A','B')
    
    # Creating a UDF to calculate average of window sized 2.
    def row_avg(c1,c2):
        count_non_null = 2
        total = 0
        if c1 == None:
            c1 = 0
            count_non_null = count_non_null - 1
        if c2 == None:
            c2 = 0
            count_non_null = count_non_null - 1
        if count_non_null == 0:
            return None
        else:
            return int((c1+c2)/count_non_null)
    
    row_avg = udf(row_avg)
    
    df = df.withColumn('B_Lag_1', lag(col('B'),1).over(my_window))\
           .withColumn('C', datediff(col('B'),col('B_Lag_1'))).drop('B_Lag_1')\
           .withColumn('C_Lag_1', lag(col('C'),1).over(my_window))\
           .withColumn('moving_avg',row_avg(col('C'),col('C_Lag_1'))).drop('C','C_Lag_1')
    df.show()
    +---+----------+----------+
    |  A|         B|moving_avg|
    +---+----------+----------+
    |  1|2018-03-20|      null|
    |  1|2018-12-25|       280|
    |  1|2019-01-20|       153|
    |  3|2018-01-01|      null|
    |  3|2018-11-08|       311|
    |  2|2019-01-01|      null|
    |  2|2019-01-15|        14|
    |  4|2018-04-09|      null|
    +---+----------+----------+
    

    【讨论】:

      【解决方案2】:

      可能有更聪明的方法来实现这一点,但您也可以使用 RDD :

      from operator import add
      from numpy import mean
      from datetime import datetime
      
      data = [(1, "2018-12-25"), (2, "2019-01-15"), (1, "2019-01-20"), (3, "2018-01-01"),
              (2, "2019-01-01"), (4, "2018-04-09"), (3, "2018-11-08"), (1, "2018-03-20")]
      data = sc.parallelize(data).mapValues(lambda v: [datetime.strptime(v, "%Y-%m-%d")]).reduceByKey(add)
      
      def computeMvgAvg(values):
      sorted_date = sorted(values)
      diffs = []
      mvg_avg = []
      for i in range(1, len(sorted_date)):
          diffs.append(int((sorted_date[i] - sorted_date[i-1]).total_seconds()/86400))
          mvg_avg.append(int(mean(diffs)))
      diffs = [None] + diffs
      mvg_avg = [None] + mvg_avg
      return zip(sorted_date, diffs, mvg_avg)
      
      sch = StructType([
         StructField("A", StringType(), True),
         StructField("B", DateType(), True),
         StructField("C", IntegerType(), True),
         StructField("moving_avg", IntegerType(), True)
      ])
      data.flatMapValues(myMapValues).map(lambda row: [row[0]] + list(row[1])).toDF(schema=sch).show()
      
      +---+----------+----+----------+
      |  A|         B|   C|moving_avg|
      +---+----------+----+----------+
      |  1|2018-03-20|null|      null|
      |  1|2018-12-25| 280|       280|
      |  1|2019-01-20|  26|       153|
      |  2|2019-01-01|null|      null|
      |  2|2019-01-15|  14|        14|
      |  3|2018-01-01|null|      null|
      |  3|2018-11-08| 311|       311|
      |  4|2018-04-09|null|      null|
      +---+----------+----+----------+
      

      【讨论】:

        猜你喜欢
        • 2021-09-30
        • 1970-01-01
        • 2015-05-25
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2012-04-26
        相关资源
        最近更新 更多