【问题标题】:Conditional average partitioned by条件平均划分为
【发布时间】:2020-12-05 20:09:43
【问题描述】:

我有:

from pyspark.sql import functions as F
from pyspark.sql.window import Window


df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00",'Store 1'),
                            (13, "2017-04-15T12:27:18+00:00",'Store 1'),
                            (25, "2017-05-18T11:27:18+00:00",'Store 1'),
                            (18, "2017-05-19T11:27:18+00:00",'Store 1'),
                            (13, "2017-03-15T12:27:18+00:00",'Store 2'),
                            (25, "2017-05-18T11:27:18+00:00",'Store 2'),
                            (25, "2017-08-18T11:27:18+00:00",'Store 2')],
                        ["dollars", "timestampGMT",'Store'])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
dollars timestampGMT    Store
17  2017-03-10 15:27:18 Store 1
13  2017-04-15 12:27:18 Store 1
25  2017-05-18 11:27:18 Store 1
18  2017-05-19 11:27:18 Store 1
13  2017-03-15 12:27:18 Store 2
25  2017-05-18 11:27:18 Store 2
25  2017-08-18 11:27:18 Store 2

我想按过去 3 个月取平均值(如果存在最近 3 个月,否则为 0),按商店分组。 结束:

dollars timestampGMT    Store   Last_3_months_Average
17  2017-03-10 15:27:18 Store 1 0
13  2017-04-15 12:27:18 Store 1 0
25  2017-05-18 11:27:18 Store 1 18.25
18  2017-05-19 11:27:18 Store 1 18.25
13  2017-03-15 12:27:18 Store 2 0
25  2017-05-18 11:27:18 Store 2 0
25  2017-08-18 11:27:18 Store 2 0
25  2017-08-19 11:27:18 Store 2 0

不知道如何解决这个问题。我应该先按月分组吗?

【问题讨论】:

标签: python pyspark


【解决方案1】:

试试这个。

import pyspark.sql.functions as f
from pyspark.sql import Window

w1 = Window.partitionBy('Store').orderBy('month')
w2 = Window.partitionBy('Store').orderBy('month').rangeBetween(-2, 0)

df2 = df.withColumn('month', (f.year('timestampGMT') - 2000) * 12 + f.month('timestampGMT'))
df2.show(10, False)
+-------+-------------------+-------+-----+
|dollars|timestampGMT       |Store  |month|
+-------+-------------------+-------+-----+
|17     |2017-03-10 15:27:18|Store 1|207  |
|13     |2017-04-15 12:27:18|Store 1|208  |
|25     |2017-05-18 11:27:18|Store 1|209  |
|18     |2017-05-19 11:27:18|Store 1|209  |
|13     |2017-03-15 12:27:18|Store 2|207  |
|25     |2017-05-18 11:27:18|Store 2|209  |
|25     |2017-08-18 11:27:18|Store 2|212  |
+-------+-------------------+-------+-----+

df3 = df2.select('Store', 'month').distinct() \
        .withColumn('lag', f.lag('month', 2, 0).over(w1)) \
        .withColumn('condition', f.when(f.col('month') == f.col('lag') + 2, True).otherwise(False))
df3.show(10, False)

+-------+-----+---+---------+
|Store  |month|lag|condition|
+-------+-----+---+---------+
|Store 1|207  |0  |false    |
|Store 1|208  |0  |false    |
|Store 1|209  |207|true     |
|Store 2|207  |0  |false    |
|Store 2|209  |0  |false    |
|Store 2|212  |207|false    |
+-------+-----+---+---------+

df2.join(df3, ['Store', 'month'], 'inner') \
  .withColumn('avg', f.when(f.col('condition') == True, f.avg('dollars').over(w2)).otherwise(0)) \
  .show(10, False)

+-------+-----+-------+-------------------+---+---------+-----+
|Store  |month|dollars|timestampGMT       |lag|condition|avg  |
+-------+-----+-------+-------------------+---+---------+-----+
|Store 1|207  |17     |2017-03-10 15:27:18|0  |false    |0.0  |
|Store 1|208  |13     |2017-04-15 12:27:18|0  |false    |0.0  |
|Store 1|209  |25     |2017-05-18 11:27:18|207|true     |18.25|
|Store 1|209  |18     |2017-05-19 11:27:18|207|true     |18.25|
|Store 2|207  |13     |2017-03-15 12:27:18|0  |false    |0.0  |
|Store 2|209  |25     |2017-05-18 11:27:18|0  |false    |0.0  |
|Store 2|212  |25     |2017-08-18 11:27:18|207|false    |0.0  |
+-------+-----+-------+-------------------+---+---------+-----+

【讨论】:

    猜你喜欢
    • 2012-02-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多