【发布时间】:2020-04-22 18:30:16
【问题描述】:
我有一个具有以下结构的数据框:
+----------+------+------+----------------+--------+------+
| date|market|metric|aggregator_Value|type |rank |
+----------+------+------+----------------+--------+------+
|2018-08-05| m1| 16 | m1|median | 1 |
|2018-08-03| m1| 5 | m1|median | 2 |
|2018-08-01| m1| 10 | m1|mean | 3 |
|2018-08-05| m2| 35 | m2|mean | 1 |
|2018-08-03| m2| 25 | m2|mean | 2 |
|2018-08-01| m2| 5 | m2|mean | 3 |
+----------+------+------+----------------+--------+------+
在此数据框中,排名列是根据市场列的日期和分组顺序计算的。 像这样
val w_rank = Window.partitionBy("market").orderBy(desc("date"))
val outputDF2=outputDF1.withColumn("rank",rank().over(w_rank))
我想在 rank=1 时提取输出数据框中指标列的连接值,条件是如果 rank=1 行中的 type="median" 然后将所有指标值与该市场连接.否则,如果 rank=1 行中的 type="mean" ,则仅连接前 2 个度量值。像这样
+----------+------+------+----------------+--------+---------+
| date|market|metric|aggregator_Value|type |result |
+----------+------+------+----------------+--------+---------+
|2018-08-05| m1| 16 | m1|median |10|5|16 |
|2018-08-05| m2| 35 | m1|mean |25|35 |
+----------+------+------+----------------+--------+---------+
我怎样才能做到这一点?
【问题讨论】:
标签: scala apache-spark apache-spark-2.0