【问题标题】:Pyspark window with aggregating results between groups具有组间聚合结果的 Pyspark 窗口
【发布时间】:2020-10-21 11:07:12
【问题描述】:

假设我有一个 DataFrame,其中包含来自不同用户通过不同协议和记录的指标值的请求:

+---+-----+--------+------------+
| ts| user|protocol|metric_value|
+---+-----+--------+------------+
|  0|user1|     tcp|         197|
|  1|user1|     udp|         155|
|  2|user1|     tcp|         347|
|  3|user1|     tcp|         117|
|  4|user1|     tcp|         230|
|  5|user1|     udp|         225|
|  6|user1|     udp|         297|
|  7|user1|     tcp|         790|
|  8|user1|     udp|         216|
|  9|user1|     udp|         200|
+---+-----+--------+------------+

我需要添加另一列,其中将为当前用户的每个协议添加最后记录的平均 metric_value(在当前时间戳之前且不早于 current_ts - 4)。 所以,算法是这样的:

  • 对于每一行 X:
    • 查找 row.user == X.user 和 row.ts
    • 从这些行中提取每个协议的最新 metric_value(如果相应的记录早于 X.ts - 4,则将其丢弃)
    • 计算这些 metric_values 的平均值
    • 将计算的平均值附加到新列中的行

想要的结果应该是这样的:

+---+-----+--------+------------+-------+
| ts| user|protocol|metric_value|avg_val|
+---+-----+--------+------------+-------+
|  0|user1|     tcp|         197|   null| // no data for user1
|  1|user1|     udp|         155|    197| // only tcp value available
|  2|user1|     tcp|         347|    176| // (197 + 155) / 2
|  3|user1|     tcp|         117|    251| // (347 + 155) / 2
|  4|user1|     tcp|         230|    136| // (117 + 155) / 2
|  5|user2|     udp|         225|   null| // because no data for user2
|  6|user1|     udp|         297|    230| // because record with ts==1 is too old now
|  7|user1|     tcp|         790|  263.5| // (297 + 230) / 2
|  8|user1|     udp|         216|  543.5| // (297 + 790) / 2
|  9|user1|     udp|         200|    503| // (216 + 790) / 2
+---+-----+--------+------------+-------+

请注意,表中可能有任意数量的协议和用户。

如何实现?

我尝试过使用窗口函数、lag(1) 和按协议分区,但聚合函数只计算单个分区的平均值,而不是不同的分区结果。 最接近的结果是使用 row_number 按协议分区的 sql 请求,但我无法在那里传播 row.ts

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql


    【解决方案1】:

    这是基于 Scala 的解决方案,您可以将逻辑转换为 Python/Pyspark

    样本数据:

    val df = Seq((0,"user1","tcp",197),(1,"user1","udp",155),(2,"user1","tcp",347),(3,"user1","tcp",117),(4,"user1","tcp",230),(5,"user2","udp",225),(6,"user1","udp",297),(7,"user1","tcp",790),(8,"user1","udp",216),(9,"user1","udp",200))
    .toDF("ts","user","protocol","metric_value")
    

    对于每一行,获取列表中current_row.ts -4 的所有行(protocol,metric_value)

    val winspec = Window.partitionBy("user").orderBy("ts").rangeBetween(Window.currentRow - 4, Window.currentRow-1)
    val df2 = df.withColumn("recent_list", collect_list(struct($"protocol", $"metric_value")).over(winspec))
    
    df2.orderBy("ts").show(false)
    /*
    
    +---+-----+--------+------------+------------------------------------------------+
    |ts |user |protocol|metric_value|recent_list                                       |
    +---+-----+--------+------------+------------------------------------------------+
    |0  |user1|tcp     |197         |[]                                              |
    |1  |user1|udp     |155         |[[tcp, 197]]                                    |
    |2  |user1|tcp     |347         |[[tcp, 197], [udp, 155]]                        |
    |3  |user1|tcp     |117         |[[tcp, 197], [udp, 155], [tcp, 347]]            |
    |4  |user1|tcp     |230         |[[tcp, 197], [udp, 155], [tcp, 347], [tcp, 117]]|
    |5  |user2|udp     |225         |[]                                              |
    |6  |user1|udp     |297         |[[tcp, 347], [tcp, 117], [tcp, 230]]            |
    |7  |user1|tcp     |790         |[[tcp, 117], [tcp, 230], [udp, 297]]            |
    |8  |user1|udp     |216         |[[tcp, 230], [udp, 297], [tcp, 790]]            |
    |9  |user1|udp     |200         |[[udp, 297], [tcp, 790], [udp, 216]]            |
    +---+-----+--------+------------+------------------------------------------------+
    

    现在您在一行中获得了所有必需的信息。您可以编写一个 UDF 来应用您获取最新协议类型和平均值的逻辑。

    
    def getAverageValueForUniqRecents(list : Array[StructType]): Double = {
      // you logic goes here. 
      // Loop through your array in REVERSE ORDER
      // maintain a set to check if protocol already visited then skip, otherwise SUM
      //Finally average
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-02-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-09-05
      相关资源
      最近更新 更多