【发布时间】:2020-10-21 11:07:12
【问题描述】:
假设我有一个 DataFrame,其中包含来自不同用户通过不同协议和记录的指标值的请求:
+---+-----+--------+------------+
| ts| user|protocol|metric_value|
+---+-----+--------+------------+
| 0|user1| tcp| 197|
| 1|user1| udp| 155|
| 2|user1| tcp| 347|
| 3|user1| tcp| 117|
| 4|user1| tcp| 230|
| 5|user1| udp| 225|
| 6|user1| udp| 297|
| 7|user1| tcp| 790|
| 8|user1| udp| 216|
| 9|user1| udp| 200|
+---+-----+--------+------------+
我需要添加另一列,其中将为当前用户的每个协议添加最后记录的平均 metric_value(在当前时间戳之前且不早于 current_ts - 4)。 所以,算法是这样的:
- 对于每一行 X:
- 查找 row.user == X.user 和 row.ts
- 从这些行中提取每个协议的最新 metric_value(如果相应的记录早于 X.ts - 4,则将其丢弃)
- 计算这些 metric_values 的平均值
- 将计算的平均值附加到新列中的行
想要的结果应该是这样的:
+---+-----+--------+------------+-------+
| ts| user|protocol|metric_value|avg_val|
+---+-----+--------+------------+-------+
| 0|user1| tcp| 197| null| // no data for user1
| 1|user1| udp| 155| 197| // only tcp value available
| 2|user1| tcp| 347| 176| // (197 + 155) / 2
| 3|user1| tcp| 117| 251| // (347 + 155) / 2
| 4|user1| tcp| 230| 136| // (117 + 155) / 2
| 5|user2| udp| 225| null| // because no data for user2
| 6|user1| udp| 297| 230| // because record with ts==1 is too old now
| 7|user1| tcp| 790| 263.5| // (297 + 230) / 2
| 8|user1| udp| 216| 543.5| // (297 + 790) / 2
| 9|user1| udp| 200| 503| // (216 + 790) / 2
+---+-----+--------+------------+-------+
请注意,表中可能有任意数量的协议和用户。
如何实现?
我尝试过使用窗口函数、lag(1) 和按协议分区,但聚合函数只计算单个分区的平均值,而不是不同的分区结果。 最接近的结果是使用 row_number 按协议分区的 sql 请求,但我无法在那里传播 row.ts
【问题讨论】:
标签: apache-spark pyspark apache-spark-sql