以下内容应该有效,但请注意,它具有 posexplode(explode 和具有位置值的 array),如果您拥有庞大的数据集,这可能是一项成本高昂的操作。
val df = Seq(
("001", "aaa", "true,false,false"),
("002", "bbb", "true,true,true"),
("003", "ccc", "false,true,true")
).toDF("id", "name", "data")
val codes = Seq(
(0, "code1"),
(1, "code2"),
(2, "code3")
).toDF("code_id", "codes")
val df1 = df.select($"*", posexplode(split($"data", ",")))
.join(codes, $"pos" === $"code_id")
.withColumn( "codes", when($"col" === "false", $"codes").otherwise(null) )
//+---+----+----------------+---+-----+-------+-----+
//| id|name| data|pos| col|code_id|codes|
//+---+----+----------------+---+-----+-------+-----+
//|001| aaa|true,false,false| 0| true| 0| null|
//|001| aaa|true,false,false| 1|false| 1|code2|
//|001| aaa|true,false,false| 2|false| 2|code3|
//|002| bbb| true,true,true| 0| true| 0| null|
//|002| bbb| true,true,true| 1| true| 1| null|
//|002| bbb| true,true,true| 2| true| 2| null|
//|003| ccc| false,true,true| 0|false| 0|code1|
//|003| ccc| false,true,true| 1| true| 1| null|
//|003| ccc| false,true,true| 2| true| 2| null|
//+---+----+----------------+---+-----+-------+-----+
val finalDf = df1.groupBy($"id", $"name").agg(concat_ws(",", collect_list($"codes")).as("data"))
//+---+----+-----------+
//| id|name| data|
//+---+----+-----------+
//|002| bbb| |
//|001| aaa|code2,code3|
//|003| ccc| code1|
//+---+----+-----------+