虽然这可以使用groupBy 完成,但当您需要结果数据框中的所有原始行时,窗口函数通常更容易。我们可以使用collect_list,但作为doc says,顺序是不确定的,所以让我们创建卷和关键字的元组:
val txt =
"""Parent Keyword Volume
|P1 K1 100
|P1 K2 200
|P1 K3 150
|P2 K4 100
|P2 K5 200""".stripMargin.lines
.map(_.split("\\s+").mkString("|"))
.toSeq
.toDS()
val df = spark.read
.option("inferSchema", true)
.option("header", true)
.option("delimiter", "|")
.csv(txt)
val win = Window.partitionBy($"Parent")
val df1 =
df.select($"Keyword",
collect_list(struct(-$"Volume", $"Keyword")).over(win) as "rel")
现在我们几乎有了所需的格式
df1.select(array_sort($"rel") as "Related_keywords")
.show(20, false)
输出:
+------------------------------------+
|Related_keywords |
+------------------------------------+
|[[-200, K5], [-100, K4]] |
|[[-200, K5], [-100, K4]] |
|[[-200, K2], [-150, K3], [-100, K1]]|
|[[-200, K2], [-150, K3], [-100, K1]]|
|[[-200, K2], [-150, K3], [-100, K1]]|
+------------------------------------+
但是,有两个问题,原来的Keyword会在列表中重复,并且所有关键字前面都有负数。为了使它更漂亮,我认为需要 UDF:s(找不到用于解压缩元组的 SQL 函数):
val myudf = udf(
(keyword: String, rel: Seq[Row]) =>
rel
.collect {
case Row(volume: Int, kw: String) if kw != keyword => (volume, kw)
}
.sortBy(_._1)
.map(_._2))
df1.select($"Keyword", myudf($"Keyword", $"rel") as "Related_keywords")
.show(20, false)
输出:
+-------+----------------+
|Keyword|Related_keywords|
+-------+----------------+
|K4 |[K5] |
|K5 |[K4] |
|K1 |[K2, K3] |
|K2 |[K3, K1] |
|K3 |[K2, K1] |
+-------+----------------+