【问题标题】:Pyspark calculate time difference ordered by codePyspark计算按代码排序的时间差
【发布时间】:2019-09-11 07:04:42
【问题描述】:

如果我可以按组计算数据集的时间差,我想知道是否可以使用 pyspark。 例如我有

CODE1 | CODE2  | TIME 
00001 |  AAA   | 2019-01-01 14:00:00
00001 |  AAA   | 2019-01-01 14:05:00
00001 |  AAA   | 2019-01-01 14:10:00
00001 |  BBB   | 2019-01-01 14:15:00
00001 |  BBB   | 2019-01-01 14:20:00
00001 |  AAA   | 2019-01-01 14:25:00
00001 |  AAA   | 2019-01-01 14:30:00

我想要的是类似

CODE1 | CODE2  | TIME_DIFF
00001 |  AAA   | 10 MINUTES 
00001 |  BBB   | 5 MINUTES
00001 |  AAA   | 5 MINUTES

时间差是从同一类别的最后一条记录到第一条记录。我已经按时间对信息进行了排序。 有可能吗?

【问题讨论】:

    标签: pyspark window partition


    【解决方案1】:

    我用一种非常正常和体面的方法对其进行了编码。但是,可以利用 spark 中提供的更多内置函数来优化以下内容。

    >>> df.show()
    +-----+-----+-------------------+
    |CODE1|CODE2|               TIME|
    +-----+-----+-------------------+
    |    1|  AAA|2019-01-01 14:00:00|
    |    1|  AAA|2019-01-01 14:05:00|
    |    1|  AAA|2019-01-01 14:10:00|
    |    1|  BBB|2019-01-01 14:15:00|
    |    1|  BBB|2019-01-01 14:20:00|
    |    1|  AAA|2019-01-01 14:25:00|
    |    1|  AAA|2019-01-01 14:30:00|
    +-----+-----+-------------------+
    
    >>> df.printSchema()
    root
     |-- CODE1: long (nullable = true)
     |-- CODE2: string (nullable = true)
     |-- TIME: string (nullable = true)
    
    >>> from pyspark.sql import functions as F, Window
    >>> win = Window.partitionBy(F.lit(0)).orderBy('TIME')
    
    #batch_order column is to group CODE2 as per the ordered timestamp
    >>> df_1=df.withColumn('prev_batch', F.lag('CODE2').over(win)) \
    ...   .withColumn('flag', F.when(F.col('CODE2') == F.col('prev_batch'),0).otherwise(1)) \
    ...   .withColumn('batch_order', F.sum('flag').over(win)) \
    ...   .drop('prev_batch', 'flag') \
    ...   .sort('TIME')
    
    >>> df_1.show()
    +-----+-----+-------------------+-----------+
    |CODE1|CODE2|               TIME|batch_order|
    +-----+-----+-------------------+-----------+
    |    1|  AAA|2019-01-01 14:00:00|          1|
    |    1|  AAA|2019-01-01 14:05:00|          1|
    |    1|  AAA|2019-01-01 14:10:00|          1|
    |    1|  BBB|2019-01-01 14:15:00|          2|
    |    1|  BBB|2019-01-01 14:20:00|          2|
    |    1|  AAA|2019-01-01 14:25:00|          3|
    |    1|  AAA|2019-01-01 14:30:00|          3|
    +-----+-----+-------------------+-----------+
    
    #Extract min and max timestamps for each group
    >>> df_max=df_1.groupBy([df_1.batch_order,df_1.CODE2]).agg(F.max("TIME").alias("mx"))
    >>> df_min=df_1.groupBy([df_1.batch_order,df_1.CODE2]).agg(F.min("TIME").alias("mn"))
    >>> df_max.show()
    +-----------+-----+-------------------+
    |batch_order|CODE2|                 mx|
    +-----------+-----+-------------------+
    |          1|  AAA|2019-01-01 14:10:00|
    |          2|  BBB|2019-01-01 14:20:00|
    |          3|  AAA|2019-01-01 14:30:00|
    +-----------+-----+-------------------+
    
    >>> df_min.show()
    +-----------+-----+-------------------+
    |batch_order|CODE2|                 mn|
    +-----------+-----+-------------------+
    |          1|  AAA|2019-01-01 14:00:00|
    |          2|  BBB|2019-01-01 14:15:00|
    |          3|  AAA|2019-01-01 14:25:00|
    +-----------+-----+-------------------+
    
    #join on batch_order
    >>> df_joined=df_max.join(df_min,df_max.batch_order==df_min.batch_order)
    >>> df_joined.show()
    +-----------+-----+-------------------+-----------+-----+-------------------+
    |batch_order|CODE2|                 mx|batch_order|CODE2|                 mn|
    +-----------+-----+-------------------+-----------+-----+-------------------+
    |          1|  AAA|2019-01-01 14:10:00|          1|  AAA|2019-01-01 14:00:00|
    |          3|  AAA|2019-01-01 14:30:00|          3|  AAA|2019-01-01 14:25:00|
    |          2|  BBB|2019-01-01 14:20:00|          2|  BBB|2019-01-01 14:15:00|
    +-----------+-----+-------------------+-----------+-----+-------------------+
    
    
    >>> from pyspark.sql.functions import unix_timestamp
    >>> from pyspark.sql.types import IntegerType
    #difference between the max and min timestamp
    >>> df_joined.withColumn("diff",((unix_timestamp(df_joined.mx, 'yyyy-MM-dd HH:mm:ss')-unix_timestamp(df_joined.mn, 'yyyy-MM-dd HH:mm:ss'))/60).cast(IntegerType())).show()
    +-----------+-----+-------------------+-------------------+----+
    |batch_order|CODE2|                 mx|                 mn|diff|
    +-----------+-----+-------------------+-------------------+----+
    |          1|  AAA|2019-01-01 14:10:00|2019-01-01 14:00:00|  10|
    |          3|  AAA|2019-01-01 14:30:00|2019-01-01 14:25:00|   5|
    |          2|  BBB|2019-01-01 14:20:00|2019-01-01 14:15:00|   5|
    +-----------+-----+-------------------+-------------------+----+
    

    【讨论】:

      猜你喜欢
      • 2020-10-09
      • 2021-11-08
      • 1970-01-01
      • 2020-05-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-08-03
      相关资源
      最近更新 更多