【问题标题】:pyspark groupby agg with new col: diff between oldest and newest timetamp带有新列的pyspark groupby agg:最旧和最新时间​​戳之间的差异
【发布时间】:2023-02-16 22:45:45
【问题描述】:

我有包含以下列的 pyspark 数据框:

  • session_id
  • 时间戳
data = [(("ID1", "2021-12-10 10:00:00")), 
        (("ID1", "2021-12-10 10:05:00")),
        (("ID2", "2021-12-10 10:20:00")),
        (("ID2", "2021-12-10 10:24:00")),
        (("ID2", "2021-12-10 10:26:00")),
]

我想对会话进行分组并添加一个名为持续时间的新列,这将是该会话的最旧和最新时间​​戳之间的差异(以秒为单位):

ID1: 300
ID2: 360

如何实现呢?

谢谢,

【问题讨论】:

    标签: pyspark


    【解决方案1】:

    您可以使用像 collect_list 这样的聚合函数,然后对列表执行最大和最小操作。要获得以秒为单位的持续时间,您可以将时间值转换为 unix_timestamp,然后执行差分。

    尝试这个:

    from pyspark.sql.functions import col, min, max as _max, array_max, collect_list, array_min, unix_timestamp
    
    data = [("ID1", "2021-12-10 10:00:00"), 
            ("ID1", "2021-12-10 10:05:00"),
            ("ID2", "2021-12-10 10:20:00"),
            ("ID2", "2021-12-10 10:24:00"),
            ("ID2", "2021-12-10 10:26:00"),
    ]
    df = spark.createDataFrame(data, ['sessionId', 'time']).select('sessionId', col('time').cast('timestamp'))
    
    df2 = df.groupBy('sessionId')
    .agg(
      array_max(collect_list('time')).alias('max_time'), 
      array_min(collect_list('time')).alias('min_time'))
    .withColumn('duration', 
                unix_timestamp('max_time')-unix_timestamp('min_time'))
    df2.show()
    

    【讨论】:

      猜你喜欢
      • 2019-05-06
      • 2021-11-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-11-08
      相关资源
      最近更新 更多