【问题标题】:Conditional Concatenation in SparkSpark 中的条件连接
【发布时间】:2020-04-22 18:30:16
【问题描述】:

我有一个具有以下结构的数据框:

+----------+------+------+----------------+--------+------+
|      date|market|metric|aggregator_Value|type    |rank  |
+----------+------+------+----------------+--------+------+
|2018-08-05|    m1|   16 |              m1|median  |  1   |
|2018-08-03|    m1|    5 |              m1|median  |  2   |
|2018-08-01|    m1|   10 |              m1|mean    |  3   |
|2018-08-05|    m2|   35 |              m2|mean    |  1   |
|2018-08-03|    m2|   25 |              m2|mean    |  2   |
|2018-08-01|    m2|    5 |              m2|mean    |  3   |
+----------+------+------+----------------+--------+------+

在此数据框中,排名列是根据市场列的日期和分组顺序计算的。 像这样

val w_rank = Window.partitionBy("market").orderBy(desc("date"))
val outputDF2=outputDF1.withColumn("rank",rank().over(w_rank))

我想在 rank=1 时提取输出数据框中指标列的连接值,条件是如果 rank=1 行中的 type="median" 然后将所有指标值与该市场连接.否则,如果 rank=1 行中的 type="mean" ,则仅连接前 2 个度量值。像这样

+----------+------+------+----------------+--------+---------+
|      date|market|metric|aggregator_Value|type    |result   |
+----------+------+------+----------------+--------+---------+
|2018-08-05|    m1|   16 |              m1|median  |10|5|16  |
|2018-08-05|    m2|   35 |              m1|mean    |25|35    |
+----------+------+------+----------------+--------+---------+    

我怎样才能做到这一点?

【问题讨论】:

    标签: scala apache-spark apache-spark-2.0


    【解决方案1】:

    您可以根据具体情况将metric列作废,并应用collect_list后跟concat_ws得到想要的结果,如下所示:

    val df = Seq(
      ("2018-08-05", "m1", 16, "m1", "median", 1),
      ("2018-08-03", "m1",  5, "m1", "median", 2),
      ("2018-08-01", "m1", 10, "m1", "mean",   3),
      ("2018-08-05", "m2", 35, "m2", "mean",   1),
      ("2018-08-03", "m2", 25, "m2", "mean",   2),
      ("2018-08-01", "m2",  5, "m2", "mean",   3)
    ).toDF("date", "market", "metric", "aggregator_value", "type", "rank")
    
    val win_desc = Window.partitionBy("market").orderBy(desc("date"))
    val win_asc = Window.partitionBy("market").orderBy(asc("date"))
    
    df.
      withColumn("rank1_type", first($"type").over(win_desc.rowsBetween(Window.unboundedPreceding, 0))).
      withColumn("cond_metric", when($"rank1_type" === "mean" && $"rank" > 2, null).otherwise($"metric")).
      withColumn("result", concat_ws("|", collect_list("cond_metric").over(win_asc))).
      where($"rank" === 1).
      show
    // +----------+------+------+----------------+------+----+----------+-----------+-------+
    // |      date|market|metric|aggregator_value|  type|rank|rank1_type|cond_metric| result|
    // +----------+------+------+----------------+------+----+----------+-----------+-------+
    // |2018-08-05|    m1|    16|              m1|median|   1|    median|         16|10|5|16|
    // |2018-08-05|    m2|    35|              m2|  mean|   1|      mean|         35|  25|35|
    // +----------+------+------+----------------+------+----+----------+-----------+-------+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-09-22
      • 1970-01-01
      • 1970-01-01
      • 2015-09-23
      • 2023-03-16
      • 2018-11-20
      相关资源
      最近更新 更多