Higher Order Functions - Transform可用于根据字典对col中的元素进行排名,然后排序得到排名最低的元素。
from pyspark.sql import functions as F
from itertools import chain
data = [(1, "A,B,C",),
(2, "D,C",),
(3, "B,C,A",),
(4, None,), ]
df = spark.createDataFrame(data, ("id", "col", ))
d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}
mapper = F.create_map([F.lit(c) for c in chain.from_iterable(d.items())])
"""
Mapper has the value Column<'map(A, 1, B, 2, C, 3, D, 4)'>
"""
(df.withColumn("col", F.split(F.col("col"), ",")) # Split string to create an array
.withColumn("mapper", mapper) # Add mapping columing to the dataframe
.withColumn("col", F.expr("transform(col, x -> struct(mapper[x] as rank, x as col))")) # Iterate over array and look up rank from mapper
.withColumn("col", F.array_min(F.col("col")).col) # array_min find minimum value based on the first struct field
).select("id", "col").show()
"""
+---+----+
| id| col|
+---+----+
| 1| A|
| 2| C|
| 3| A|
| 4|null|
+---+----+
"""