【问题标题】:Aggregated timestamp with 1 second difference in pysparkpyspark中具有1秒差异的聚合时间戳
【发布时间】:2020-06-08 17:39:56
【问题描述】:

我有如下示例所示的 pyspark 数据框(原始数据每天有 1.5 条记录)。它包含用户数据,包含开始时间和结束时间列以及几个人口统计变量(id、age_group、county 等)。很多记录只有1秒的时差

+--------+-------------+---------+-----------------------+-------------------+---------+
|id      | date        | group   |start_time             | end_time          | duration|
+--------+-------------+---------+-----------------------+-------------------+---------+
|    78aa| 2020-04-14  | 3       |    2020-04-14 19:00:00|2020-04-14 19:23:59|24       |
|    78aa| 2020-04-14  | 3       |    2020-04-14 19:24:00|2020-04-14 19:26:59|4        |
|    78aa| 2020-04-14  | 3       |    2020-04-14 19:27:00|2020-04-14 19:35:59|8        |
|    78aa| 2020-04-14  | 3       |    2020-04-14 19:36:00|2020-04-14 19:55:00|19       |
|    25aa| 2020-04-15  | 7       |    2020-04-15 08:00:00|2020-04-15 08:02:59|3        |
|    25aa| 2020-04-15  | 7       |    2020-04-15 11:03:00|2020-04-15 11:11:59|9        |
|    25aa| 2020-04-15  | 7       |    2020-04-15 11:12:00|2020-04-15 11:45:59|34       |
|    25aa| 2020-04-15  | 7       |    2020-04-15 11:46:00|2020-04-15 11:47:00|1        |
+--------+-------+-----+---------+-----------------------+-------------------+---------+

我的尝试:全天聚合数据

from pyspark.sql.functions import sum, first

df = df.groupBy("date" , "id" ).agg(first("group"), sum("duration"))\
.toDF("data","id","group", "duration")

我还需要在白天在用户聚合级别传输数据帧。我如何用 pyspark 获得这个?我不想将我的数据转换为 pandas 数据帧,因为 pandas 会将数据加载到驱动程序的内存中,我将面临内存问题:这是所需的输出

+--------+--------------+------+-----------------------+-------------------+---------+
|id      |  date        |group |start_time             | end_time          | duration|
+--------+--------------+------+-----------------------+-------------------+---------+
|    78aa|  2020-04-14  | 3    |    2020-04-14 19:00:00|2020-04-14 19:55:00|55       |
|    25aa|  2020-04-15  | 7    |    2020-04-15 08:00:00|2020-04-15 08:02:59|3        |
|    25aa|  2020-04-15  | 7    |    2020-04-15 11:00:00|2020-04-15 11:47:00|44       |
+--------+--------------+------+-----------------------+-------------------+---------+

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql databricks


    【解决方案1】:

    试试这个。需要使用 窗口函数创建一个 additional columngroup the timings 其中他们 succeed each other by 1 second p>

    from pyspark.sql import functions as F
    from pyspark.sql.window import Window
    
    w=Window().partitionBy("id","date","group").orderBy("start_time")
    df.withColumn("check", F.sum(F.when(F.unix_timestamp("start_time")-F.lag(F.unix_timestamp("end_time")).over(w)>1,F.lit(1))\
                            .otherwise(F.lit(0))).over(w))\
      .groupBy("date","id","group","check").agg(F.first("start_time").alias("start_time"),F.last("end_time").alias("end_time"),\
                                       F.sum("duration").alias("duration")).drop("check").show()
    
    #+----------+----+-----+-------------------+-------------------+--------+
    #|      date|  id|group|         start_time|           end_time|duration|
    #+----------+----+-----+-------------------+-------------------+--------+
    #|2020-04-14|78aa|    3|2020-04-14 19:00:00|2020-04-14 19:55:00|      55|
    #|2020-04-15|25aa|    7|2020-04-15 08:00:00|2020-04-15 08:02:59|       3|
    #|2020-04-15|25aa|    7|2020-04-15 11:03:00|2020-04-15 11:47:00|      44|
    #+----------+----+-----+-------------------+-------------------+--------+
    

    【讨论】:

      【解决方案2】:
              import org.apache.spark.sql.functions._
      
          val df0 = Seq(
            ("78aa", "2020-04-14", 3, "2020-04-14 19:00:00", "2020-04-14 19:23:59", 24),
            ("78aa", "2020-04-14", 3, "2020-04-14 19:24:00", "2020-04-14 19:26:59", 4),
            ("78aa", "2020-04-14", 3, "2020-04-14 19:27:00", "2020-04-14 19:35:59", 8),
            ("78aa", "2020-04-14", 3, "2020-04-14 19:36:00", "2020-04-14 19:55:00", 19),
            ("25aa", "2020-04-15", 7, "2020-04-15 08:00:00", "2020-04-15 08:02:59", 3),
            ("25aa", "2020-04-15", 7, "2020-04-15 11:03:00", "2020-04-15 11:11:59", 9),
            ("25aa", "2020-04-15", 7, "2020-04-15 11:12:00", "2020-04-15 11:45:59", 34),
            ("25aa", "2020-04-15", 7, "2020-04-15 11:46:00", "2020-04-15 11:47:00", 1)
          ).toDF("id", "date", "group", "start_time", "end_time", "duration")
      
          val df1 = df0.withColumn("start_time_1", date_format('start_time, "YYYY-MM-dd HH"))
      
          df1.show(false)
      
          val res = df1.groupBy("id", "date", "group", "start_time_1")
            .agg(min('start_time).alias("start_time"), max('end_time).alias("end_time"), sum('duration).alias("duration"))
            .orderBy('start_time.asc)
            .drop("start_time_1")
      
          res.show(false)
      //    +----+----------+-----+-------------------+-------------------+--------+
      //    |id  |date      |group|start_time         |end_time           |duration|
      //    +----+----------+-----+-------------------+-------------------+--------+
      //    |78aa|2020-04-14|3    |2020-04-14 19:00:00|2020-04-14 19:55:00|55      |
      //    |25aa|2020-04-15|7    |2020-04-15 08:00:00|2020-04-15 08:02:59|3       |
      //    |25aa|2020-04-15|7    |2020-04-15 11:03:00|2020-04-15 11:47:00|44      |
      //    +----+----------+-----+-------------------+-------------------+--------+
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多