【问题标题】:Pyspark: Calculate streak of consecutive observationsPyspark:计算连续观察的条纹
【发布时间】:2019-01-30 17:10:53
【问题描述】:

我有一个 Spark (2.4.0) 数据框,其中有一列只有两个值(01)。我需要计算此数据中连续的0s 和1s 的条纹,如果值发生变化,则将条纹重置为零。

一个例子:

from pyspark.sql import (SparkSession, Window)
from pyspark.sql.functions import (to_date, row_number, lead, col)

spark = SparkSession.builder.appName('test').getOrCreate()

# Create dataframe
df = spark.createDataFrame([
    ('2018-01-01', 'John', 0, 0),
    ('2018-01-01', 'Paul', 1, 0),
    ('2018-01-08', 'Paul', 3, 1),
    ('2018-01-08', 'Pete', 4, 0),
    ('2018-01-08', 'John', 3, 0),
    ('2018-01-15', 'Mary', 6, 0),
    ('2018-01-15', 'Pete', 6, 0),
    ('2018-01-15', 'John', 6, 1),
    ('2018-01-15', 'Paul', 6, 1),
], ['str_date', 'name', 'value', 'flag'])

df.orderBy('name', 'str_date').show()
## +----------+----+-----+----+
## |  str_date|name|value|flag|
## +----------+----+-----+----+
## |2018-01-01|John|    0|   0|
## |2018-01-08|John|    3|   0|
## |2018-01-15|John|    6|   1|
## |2018-01-15|Mary|    6|   0|
## |2018-01-01|Paul|    1|   0|
## |2018-01-08|Paul|    3|   1|
## |2018-01-15|Paul|    6|   1|
## |2018-01-08|Pete|    4|   0|
## |2018-01-15|Pete|    6|   0|
## +----------+----+-----+----+

有了这些数据,我想计算连续的零和一的条纹,按日期排序并按名称“加窗”:

# Expected result:
## +----------+----+-----+----+--------+--------+
## |  str_date|name|value|flag|streak_0|streak_1|
## +----------+----+-----+----+--------+--------+
## |2018-01-01|John|    0|   0|       1|       0|
## |2018-01-08|John|    3|   0|       2|       0|
## |2018-01-15|John|    6|   1|       0|       1|
## |2018-01-15|Mary|    6|   0|       1|       0|
## |2018-01-01|Paul|    1|   0|       1|       0|
## |2018-01-08|Paul|    3|   1|       0|       1|
## |2018-01-15|Paul|    6|   1|       0|       2|
## |2018-01-08|Pete|    4|   0|       1|       0|
## |2018-01-15|Pete|    6|   0|       2|       0|
## +----------+----+-----+----+--------+--------+

当然,如果“标志”发生变化,我需要将连胜重置为零。

有没有办法做到这一点?

【问题讨论】:

  • 假设paul2018-01-20 上有一个标志0..你希望streak_0 是2 还是1?
  • @VamsiPrabhala 罢工将是 1,因为需要重置连胜

标签: apache-spark pyspark apache-spark-sql


【解决方案1】:

这需要行数差异方法来首先对具有相同值的连续行进行分组,然后在组之间使用排名方法。

from pyspark.sql import Window 
from pyspark.sql import functions as f
#Windows definition
w1 = Window.partitionBy(df.name).orderBy(df.date)
w2 = Window.partitionBy(df.name,df.flag).orderBy(df.date)

res = df.withColumn('grp',f.row_number().over(w1)-f.row_number().over(w2))
#Window definition for streak
w3 = Window.partitionBy(res.name,res.flag,res.grp).orderBy(res.date)
streak_res = res.withColumn('streak_0',f.when(res.flag == 1,0).otherwise(f.row_number().over(w3))) \
                .withColumn('streak_1',f.when(res.flag == 0,0).otherwise(f.row_number().over(w3)))
streak_res.show()

【讨论】:

    【解决方案2】:

    如果在这种情况下您已经有一个自然排序列 (str_date),那么有一个不使用 row_number() 的更直观的解决方案。

    简而言之,要找到连续 1,只需使用

    1. 标志的累计和,
    2. 然后乘以标志。

    要找到连续的 0,请先反转标志,然后对连续的 1 执行相同操作。

    首先我们定义一个函数来计算累计和:

    from pyspark.sql import Window 
    from pyspark.sql import functions as f
    
    def cum_sum(df, new_col_name, partition_cols, order_col, value_col):
        windowval = (Window.partitionBy(partition_cols).orderBy(order_col)
                 .rowsBetween(Window.unboundedPreceding, 0))
        return df.withColumn(new_col_name, f.sum(value_col).over(windowval))
    

    注意使用rowsBetween(而不是rangeBetween)。当订单列中存在重复值时,这对于获得正确的累积总和是重要

    计算 1 的条数

    df = cum_sum(df, 
                 new_col_name='1_group', 
                 partition_cols='name', 
                 order_col='str_date',
                 value_col='flag')
    df = df.withColumn('streak_1', f.col('flag')*f.col('1_group'))
    

    计算0的条纹

    df = df.withColumn('flag_inverted', 1-f.col('flag'))
    
    df = cum_sum(df, 
                 new_col_name='0_group', 
                 partition_cols='name', 
                 order_col='str_date',
                 value_col='flag_inverted')
    df = df.withColumn('streak_0', f.col('flag_inverted')*f.col('0_group'))
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-03-18
      • 1970-01-01
      • 2023-03-27
      • 2018-08-25
      • 2018-03-20
      • 1970-01-01
      • 2017-04-25
      • 1970-01-01
      相关资源
      最近更新 更多