【问题标题】:Mean of following n row Pyspark window跟随 n 行 Pyspark 窗口的平均值
【发布时间】:2022-01-01 12:21:26
【问题描述】:

我需要为每个 ID 和行计算最后 3 个时期的平均值(不包括当前时期(我将输入相同的值)并管理该时期没有其他 3 个次要时期的情况)。

例如:

输入:

ID $ TIME
1 100 20/11/2021
1 200 17/11/2021
1 150 15/11/2021
1 170 10/11/2021
1 130 05/11/2021
2 200 20/11/2021
2 200 17/11/2021

输出:

ID $ TIME MEAN ($)
1 100 20/11/2021 (200+150+170)/3 = 173.33*
1 200 17/11/2021 (150+170+130)/3 = 150
1 150 15/11/2021 ...
1 170 10/11/2021 ...
1 130 05/11/2021 ...
... ... ... ...
2 200 20/11/2021 200
2 150 17/11/2021 ...

是否有一个窗口函数来制作它?我希望我不必使用循环:) 感谢您的宝贵帮助!

【问题讨论】:

  • 为什么第 1 行的平均值等于 100?不应该是 (200+150+170)/3 吗?
  • 我错了……现在我已经更正了!谢谢你
  • 每组的最后三行应该和$一样,对吗?
  • 是的,正确!你能帮我吗? :)

标签: python pyspark window rows between


【解决方案1】:

我找到了一个使用 window functionsapplyInPandas 方法的解决方案,从 Spark 3.0.0 开始,该方法允许对 PySpark 数据帧的每组执行 Pandas UDF。

import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *



# transform column as date
df = df.withColumn('TIME', F.to_date(F.col('TIME'), format='dd/MM/yyyy'))



# define window function to generate moving average
w = Window().partitionBy('ID').orderBy(F.desc('TIME')).rowsBetween(1, 3)



# define function and schema for applyInPandas
def dollars_last_rows(pdf):
  N = 3
  pdf.iloc[-N:]['MEAN_DOLLARS'] = pdf.iloc[-N:]['DOLLARS'].astype('float')
  return pdf

schema = StructType([
  StructField('ID', LongType(), True),
  StructField('DOLLARS', LongType(), True),
  StructField('TIME', DateType(), True),
  StructField('MEAN_DOLLARS', DoubleType(), True)
])



df \
  .withColumn('MEAN_DOLLARS', F.mean('DOLLARS').over(w)) \
  .groupby('ID') \
  .applyInPandas(dollars_last_rows, schema) \
  .show()

【讨论】:

    【解决方案2】:

    一种解决方案是使用本机 window 分析功能和 when and otherwise。这不使用任何额外的 UDF。

    
    from pyspark.sql import functions as F
    from pyspark.sql import Window, Column
    
    
    data = [(1, 100, "20/11/2021",),
            (1, 200, "17/11/2021",),
            (1, 150, "15/11/2021",),
            (1, 170, "10/11/2021",),
            (1, 130, "05/11/2021",),
            (2, 200, "20/11/2021",),
            (2, 200, "17/11/2021",), ]
    
    df = spark.createDataFrame(data, ("ID", "$", "TIME",)).withColumn("TIME", F.to_date(F.col("TIME"), "dd/MM/yyyy"))
    
    def mean_dollars() -> Column:
        window_spec = Window.partitionBy("ID").orderBy(F.desc("TIME")).rowsBetween(1, 3)
        sum_preceeding_3 = F.sum("$").over(window_spec)
        count = F.coalesce(F.count("$").over(window_spec), F.lit(0))
        return F.when(count < 3, F.col("$")).otherwise(sum_preceeding_3 / count)
    
    (df.withColumn("MEAN ($)", mean_dollars()).show())
    
    

    输出

    +---+---+----------+------------------+
    | ID|  $|      TIME|          MEAN ($)|
    +---+---+----------+------------------+
    |  1|100|2021-11-20|173.33333333333334|
    |  1|200|2021-11-17|             150.0|
    |  1|150|2021-11-15|             150.0|
    |  1|170|2021-11-10|             170.0|
    |  1|130|2021-11-05|             130.0|
    |  2|200|2021-11-20|             200.0|
    |  2|200|2021-11-17|             200.0|
    +---+---+----------+------------------+
    

    【讨论】:

      猜你喜欢
      • 2011-10-17
      • 2020-05-09
      • 1970-01-01
      • 1970-01-01
      • 2016-05-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-02-04
      相关资源
      最近更新 更多