这是一个受 R recode 函数启发的小辅助函数,它抽象了前面的答案。作为奖励,它添加了默认值选项。
from itertools import chain
from pyspark.sql.functions import col, create_map, lit, when, isnull
from pyspark.sql.column import Column
df = spark.createDataFrame([
('Tablet', ), ('Phone', ), ('PC', ), ('Other', ), (None, )
], ["device_type"])
deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'}
df.show()
+-----------+
|device_type|
+-----------+
| Tablet|
| Phone|
| PC|
| Other|
| null|
+-----------+
这里是recode的定义。
def recode(col_name, map_dict, default=None):
if not isinstance(col_name, Column): # Allows either column name string or column instance to be passed
col_name = col(col_name)
mapping_expr = create_map([lit(x) for x in chain(*map_dict.items())])
if default is None:
return mapping_expr.getItem(col_name)
else:
return when(~isnull(mapping_expr.getItem(col_name)), mapping_expr.getItem(col_name)).otherwise(default)
创建没有默认值的列会在所有不匹配的值中提供null/None。
df.withColumn("device_type", recode('device_type', deviceDict)).show()
+-----------+
|device_type|
+-----------+
| Mobile|
| Mobile|
| Desktop|
| null|
| null|
+-----------+
另一方面,为default 指定一个值会将所有不匹配的值替换为此默认值。
df.withColumn("device_type", recode('device_type', deviceDict, default='Other')).show()
+-----------+
|device_type|
+-----------+
| Mobile|
| Mobile|
| Desktop|
| Other|
| Other|
+-----------+