【问题标题】:Aggregate while dropping duplicates in pyspark在 pyspark 中删除重复项时进行聚合
【发布时间】:2020-06-21 19:33:02
【问题描述】:

我想 groupby 聚合一个 pyspark 数据帧,同时根据该数据帧的另一列删除重复项(保留最后一个值)

总之,我想将 dropDuplicates 应用于 GroupedData 对象。因此,对于每个组,我只能动态地按某列保留一行。

示例

对于下面的数据帧,直接的组聚合将是:

from pyspark.sql import functions

dataframe = spark.createDataFrame(
    [
        (1, "2020-01-01", 1, 1),
        (2, "2020-01-01", 2, 1),
        (3, "2020-01-02", 1, 1),
        (2, "2020-01-02", 1, 1)
    ],
    ("id", "ts", "feature", "h3")
).withColumn("ts", functions.col("ts").cast("timestamp"))

# +---+-------------------+-------+---+
# | id|                 ts|feature| h3|
# +---+-------------------+-------+---+
# |  1|2020-01-01 00:00:00|      1|  1|
# |  2|2020-01-01 00:00:00|      2|  1|
# |  3|2020-01-02 00:00:00|      1|  1|
# |  2|2020-01-02 00:00:00|      1|  1|
# +---+-------------------+-------+---+

aggregated = dataframe.groupby("h3",
  functions.window(
    timeColumn="ts",
    windowDuration="3 days",
    slideDuration="1 day",
  )
).agg(
  functions.sum("feature")
)
aggregated.show(truncate=False)

产生以下数据框:

+---+------------------------------------------+------------+
|h3 |window                                    |sum(feature)|
+---+------------------------------------------+------------+
|1  |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|3           |
|1  |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|5           |
|1  |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|5           |
|1  |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|2           |
+---+------------------------------------------+------------+

问题

我希望聚合仅使用每个 id最新状态。在这种情况下,id=2 已在ts=2020-01-02 00:00:00 更新为feature=1,因此当id=2 时,所有基本时间戳大于2020-01-02 00:00:00 的聚合都应仅将此状态用于列功能。预期的聚合数据框是:

+---+------------------------------------------+------------+
|h3 |window                                    |sum(feature)|
+---+------------------------------------------+------------+
|1  |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|3           |
|1  |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|3           |
|1  |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|3           |
|1  |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|2           |
+---+------------------------------------------+------------+

如何使用 pyspark 做到这一点?

更新

我假设 MapType 变量在 Spark 中不应该有重复的键。有了这个假设,我想我可以聚合列创建一个映射id -> feature,然后只用 sum 聚合映射值(或者最终聚合应该是什么)。

所以我做到了:

aggregated = dataframe.groupby("h3",
  functions.window(
    timeColumn="ts",
    windowDuration="3 days",
    slideDuration="1 day",
  )
).agg(
  functions.map_from_entries(
    functions.collect_list(
      functions.struct("id","feature")
    )
  ).alias("id_feature")
)
aggregated.show(truncate=False)

但后来我发现地图可以有重复的键:

+---+------------------------------------------+--------------------------------+
|h3 |window                                    |id_feature                      |
+---+------------------------------------------+--------------------------------+
|1  |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|[1 -> 1, 2 -> 2, 3 -> 1, 2 -> 1]|
|1  |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|[1 -> 1, 2 -> 2, 3 -> 1, 2 -> 1]|
|1  |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|[1 -> 1, 2 -> 2]                |
|1  |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|[3 -> 1, 2 -> 1]                |
+---+------------------------------------------+--------------------------------+

所以它不能解决我的问题。相反,我刚刚发现了另一个问题。在 Databricks 的 notebook 中使用显示功能时,it shows the MapType column without duplicated keys

【问题讨论】:

    标签: dataframe apache-spark pyspark apache-spark-sql databricks


    【解决方案1】:

    首先,你可以找到每个id和时间窗口的最新记录,然后将最新记录与原始数据框连接起来。

    time_window = window(timeColumn="ts", windowDuration="3 days", slideDuration="1 day")
    
    df2 = df.groupBy("h3", time_window, "id").agg(max("ts").alias("latest"))
    
    df2.alias("a").join(df.alias("b"), (col("a.id") == col("b.id")) & (col("a.latest") == col("b.ts")), "left") \
       .select("a.*", "feature") \
       .groupBy("h3", "window") \
       .agg(sum("feature")) \
       .orderBy("window") \
       .show(truncate=False)
    

    那么,结果和你预期的一样。

    +---+------------------------------------------+------------+
    |h3 |window                                    |sum(feature)|
    +---+------------------------------------------+------------+
    |1  |[2019-12-29 00:00:00, 2020-01-01 00:00:00]|3           |
    |1  |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|3           |
    |1  |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|3           |
    |1  |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|2           |
    +---+------------------------------------------+------------+
    

    【讨论】:

      【解决方案2】:

      由于您使用的是 Spark 2.4+,您可以尝试的一种方法是使用 Spark SQL aggregate 函数,见下文:

      aggregated = dataframe.groupby("h3",
         functions.window( 
           timeColumn="ts", 
           windowDuration="3 days", 
           slideDuration="1 day", 
         ) 
       ).agg( 
           functions.sort_array(functions.collect_list( 
             functions.struct("ts", "id", "feature") 
           ), False).alias("id_feature") 
       )   
      

      我将ts 字段添加到来自functions.collect_list 的结果结构数组中。使用 functions.sort_array 按ts 降序对列表进行排序(如果存在重复,则保留最新记录)。在下面的聚合函数中,我们使用包含两个字段的 named_struct 设置 zero_value:ids (MapType) 用于缓存所有已处理的 id,total 仅在缓存的ids 中不存在新 id 时进行求和。

      aggregated.selectExpr("h3", "window", """
        aggregate(
          id_feature,
          /* zero_value */
          (map() as ids, 0L as total), 
          /* merge */
          (acc, y) -> named_struct(
            /* add y.id into the ids map */
            'ids', map_concat(acc.ids, map(y.id,1)), 
            /* sum to total only when y.id doesn't exist in acc.ids map */
            'total', acc.total + IF(acc.ids[y.id] is null,y.feature,0)
          ), 
          /* finish, take only acc.total, discard acc.ids map */
          acc -> acc.total
        ) as id_features
      
      """).show()
      +---+--------------------+----------+
      | h3|              window|id_feature|
      +---+--------------------+----------+
      |  1|[2020-01-01 00:00...|         3|
      |  1|[2019-12-31 00:00...|         3|
      |  1|[2019-12-30 00:00...|         3|
      |  1|[2020-01-02 00:00...|         2|
      +---+--------------------+----------+
      

      【讨论】:

        猜你喜欢
        • 2015-07-26
        • 1970-01-01
        • 1970-01-01
        • 2017-07-24
        • 1970-01-01
        • 2021-12-26
        • 2020-01-18
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多