【问题标题】:Pyspark Dataframe: Get previous row that meets a conditionPyspark Dataframe:获取满足条件的上一行
【发布时间】:2018-09-06 07:57:21
【问题描述】:

对于 PySpark DataFrame 中的每一行,我都试图从满足特定条件的第一行中获取一个值:

如果我的数据框看起来像这样:

X  | Flag
1  | 1
2  | 0
3  | 0
4  | 0
5  | 1
6  | 0
7  | 0
8  | 0
9  | 1
10 | 0

我想要如下所示的输出:

X  | Lag_X | Flag
1  | NULL  | 1
2  | 1     | 0
3  | 1     | 0
4  | 1     | 0
5  | 1     | 1
6  | 5     | 0
7  | 5     | 0
8  | 5     | 0
9  | 5     | 1
10 | 9     | 0

我认为我可以使用延迟函数和 WindowSpec 来做到这一点,不幸的是 WindowSpec 不支持 .filter.when,所以这不起作用:

conditional_window = Window().orderBy(X).filter(df[Flag] == 1)
df = df.withColumn('lag_x', f.lag(df[x],1).over(conditional_window)

看起来这应该很简单,但我一直在绞尽脑汁试图找到解决方案,因此非常感谢您提供任何帮助

【问题讨论】:

    标签: python pyspark spark-dataframe pyspark-sql


    【解决方案1】:

    问题很老,但我认为答案可能对其他人有所帮助

    这是一个使用窗口和滞后函数的工作解决方案

    from pyspark.sql import functions as F
    from pyspark.sql import Window
    from pyspark.sql.functions import when
    from pyspark.context import SparkContext
    
    # Call SparkContext
    sc = SparkContext.getOrCreate()
    sc = sparkContext
    
    # Create DataFrame
    a = sc.createDataFrame([(1, 1), 
                            (2, 0),
                            (3, 0),
                            (4, 0),
                            (5, 1),
                            (6, 0),
                            (7, 0),
                            (8, 0),
                            (9, 1),
                           (10, 0)]
                         , ['X', 'Flag'])
    
    # Use a window function
    win = Window.orderBy("X")
    # Condition : if preceeding row in column "Flag" is not 0
    condition = F.lag(F.col("Flag"), 1).over(win) != 0
    # Add a new column : if condition is true, value is value of column "X" at the previous row
    a = a.withColumn("Flag_X", F.when(condition, F.col("X") - 1))
    

    现在,我们得到一个如下图所示的DataFrame

    +---+----+------+
    |  X|Flag|Flag_X|
    +---+----+------+
    |  1|   1|  null|
    |  2|   0|     1|
    |  3|   0|  null|
    |  4|   0|  null|
    |  5|   1|  null|
    |  6|   0|     5|
    |  7|   0|  null|
    |  8|   0|  null|
    |  9|   1|  null|
    | 10|   0|     9|
    +---+----+------+
    

    填充空值:

    a = a.withColumn("Flag_X", 
                     F.last(F.col("Flag_X"), ignorenulls=True)\
         .over(win))
    

    所以最终的DataFrame是按要求的:

    +---+----+------+
    |  X|Flag|Flag_X|
    +---+----+------+
    |  1|   1|  null|
    |  2|   0|     1|
    |  3|   0|     1|
    |  4|   0|     1|
    |  5|   1|     1|
    |  6|   0|     5|
    |  7|   0|     5|
    |  8|   0|     5|
    |  9|   1|     5|
    | 10|   0|     9|
    +---+----+------+
    

    【讨论】:

      猜你喜欢
      • 2013-08-14
      • 1970-01-01
      • 2018-03-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-05-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多