鉴于您的输入数据框为
+---+---+---+
|A |B |C |
+---+---+---+
|1 |3 |1 |
|1 |8 |2 |
|1 |5 |3 |
|2 |2 |1 |
+---+---+---+
你可以得到以下输出
+---+---------------+
|A |B |
+---+---------------+
|1 |[3,1, 5,3, 8,2]|
|2 |[2,1] |
+---+---------------+
通过简单的groupBy、aggregations 和使用functions
df.orderBy("B").groupBy("A").agg(collect_list(concat_ws(",", col("B"), col("C"))) as "B")
您可以使用udf 函数来获得最终想要的结果
def joinString = udf((b: mutable.WrappedArray[String]) => {
b.mkString("/")
} )
newdf.withColumn("B", joinString(col("B"))).show(false)
你应该得到
+---+-----------+
|A |B |
+---+-----------+
|1 |3,1/5,3/8,2|
|2 |2,1 |
+---+-----------+
注意您需要import org.apache.spark.sql.functions._ 才能使上述所有功能起作用
已编辑
B 列是根据 B 列的初始值排序的
为此,您可以将 orderBy 部分删除为
import org.apache.spark.sql.functions._
val newdf = df.groupBy("A").agg(collect_list(concat_ws(",", col("B"), col("C"))) as "B")
def joinString = udf((b: mutable.WrappedArray[String]) => {
b.mkString("/")
} )
newdf.withColumn("B", joinString(col("B"))).show(false)
你应该得到输出为
+---+-----------+
|A |B |
+---+-----------+
|1 |3,1/8,2/5,3|
|2 |2,1 |
+---+-----------+