【问题标题】:Pyspark: Replacing value in a column by searching a dictionaryPyspark:通过搜索字典替换列中的值
【发布时间】:2017-10-14 01:06:57
【问题描述】:

我是 PySpark 的新手。

我有一个 Spark DataFrame df,它有一列“device_type”。

我想将“Tablet”或“Phone”中的每个值替换为“Phone”,并将“PC”替换为“Desktop”。

在 Python 中,我可以执行以下操作,

deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'}
df['device_type'] = df['device_type'].replace(deviceDict,inplace=False)

如何使用 PySpark 实现这一点?谢谢!

【问题讨论】:

    标签: python apache-spark dataframe pyspark apache-spark-sql


    【解决方案1】:

    您可以使用na.replace

    df = spark.createDataFrame([
        ('Tablet', ), ('Phone', ),  ('PC', ), ('Other', ), (None, )
    ], ["device_type"])
    
    df.na.replace(deviceDict, 1).show()
    
    +-----------+
    |device_type|
    +-----------+
    |     Mobile|
    |     Mobile|
    |    Desktop|
    |      Other|
    |       null|
    +-----------+
    

    或地图文字:

    from itertools import chain
    from pyspark.sql.functions import create_map, lit
    
    mapping = create_map([lit(x) for x in chain(*deviceDict.items())])
    
    
    df.select(mapping[df['device_type']].alias('device_type'))
    
    +-----------+
    |device_type|
    +-----------+
    |     Mobile|
    |     Mobile|
    |    Desktop|
    |       null|
    |       null|
    +-----------+
    

    请注意,后一种解决方案会将映射中不存在的值转换为NULL。如果这不是您想要的行为,您可以添加coalesce

    from pyspark.sql.functions import coalesce
    
    
    df.select(
        coalesce(mapping[df['device_type']], df['device_type']).alias('device_type')
    )
    
    +-----------+
    |device_type|
    +-----------+
    |     Mobile|
    |     Mobile|
    |    Desktop|
    |      Other|
    |       null|
    +-----------+
    

    【讨论】:

    • 您好。即使是一年多之后:我想使用 pyspark 2.1 的映射方法。但是,与示例相反,当我的表包含“NULL”条目时,我收到错误:“Py4JJavaError:调用 o6564.collectToPython 时发生错误。:java.lang.RuntimeException:不能使用 null 作为映射键!”。我是否误解了这一点,或者您能否提示问题的根源?谢谢
    【解决方案2】:

    这是一个受 R recode 函数启发的小辅助函数,它抽象了前面的答案。作为奖励,它添加了默认值选项。

    from itertools import chain
    from pyspark.sql.functions import col, create_map, lit, when, isnull
    from pyspark.sql.column import Column
    
    df = spark.createDataFrame([
        ('Tablet', ), ('Phone', ),  ('PC', ), ('Other', ), (None, )
    ], ["device_type"])
    
    deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'}
    
    df.show()
    +-----------+
    |device_type|
    +-----------+
    |     Tablet|
    |      Phone|
    |         PC|
    |      Other|
    |       null|
    +-----------+
    

    这里是recode的定义。

    def recode(col_name, map_dict, default=None):
        if not isinstance(col_name, Column): # Allows either column name string or column instance to be passed
            col_name = col(col_name)
        mapping_expr = create_map([lit(x) for x in chain(*map_dict.items())])
        if default is None:
            return  mapping_expr.getItem(col_name)
        else:
            return when(~isnull(mapping_expr.getItem(col_name)), mapping_expr.getItem(col_name)).otherwise(default)
    

    创建没有默认值的列会在所有不匹配的值中提供null/None

    df.withColumn("device_type", recode('device_type', deviceDict)).show()
    
    +-----------+
    |device_type|
    +-----------+
    |     Mobile|
    |     Mobile|
    |    Desktop|
    |       null|
    |       null|
    +-----------+
    

    另一方面,为default 指定一个值会将所有不匹配的值替换为此默认值。

    df.withColumn("device_type", recode('device_type', deviceDict, default='Other')).show()
    
    +-----------+
    |device_type|
    +-----------+
    |     Mobile|
    |     Mobile|
    |    Desktop|
    |      Other|
    |      Other|
    +-----------+
    

    【讨论】:

    • 如何避免硬编码“device_type”? @yardsale8
    • 由于device_type 是一个列名,我不确定您是否想将其抽象出来。如果你这样做了,你可以把表达式放在一个以df、列名和翻译字典作为参数的函数中。
    【解决方案3】:

    经过大量搜索和替代,我认为使用 python dict 替换的最简单方法是使用 pyspark 数据框方法replace

    deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'}
    df_replace = df.replace(deviceDict,subset=['device_type'])
    

    这将用 dict 替换所有值,如果您传递 dict 参数和子集参数,您可以使用 df.na.replace() 获得相同的结果。他的docs 不够清楚,因为如果你搜索函数replace 你会得到两个引用,一个在pyspark.sql.DataFrame.replace 内部,另一个在pyspark.sql.DataFrameNaFunctions.replace 内部,但是两个引用的示例代码都使用@ 987654328@ 所以不清楚你是否可以实际使用df.replace

    【讨论】:

      【解决方案4】:

      您也可以使用df.withColumn 来做到这一点:

      from itertools import chain
      from pyspark.sql.functions import create_map, lit
      
      deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'}
      
      mapping_expr = create_map([lit(x) for x in chain(*deviceDict.items())])
      
      df = df.withColumn('device_type', mapping_expr[df['dvice_type']])
      df.show()
      

      【讨论】:

      • 如何用 scala 语言实现?
      • @mytabi 我认为 scala 和 spark 没有 create_maplit。然而,Scala 中的 matchcase 可以作为实现相同结果的替代解决方案。
      【解决方案5】:

      最简单的方法是在您的数据框上应用udf

          from pyspark.sql.functions import col , udf
      
          deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'}
          map_func = udf(lambda row : deviceDict.get(row,row))
          df = df.withColumn("device_type", map_func(col("device_type")))
      

      【讨论】:

      • 谢谢..如果值与列不匹配并设置None,有一个选项?
      • 正确的做法:def mapping_func(x,deviceDict): try: return deviceDict.get(x,x) except: return None map_func = udf(lambda row : mapping_func(row)) df = df.withColumn("device_type", map_func(col("device_type")))
      【解决方案6】:

      解决此问题的另一种方法是在传统 sql 中使用 CASE WHEN,但使用 f-strings 并使用 python 字典和 .join 自动生成 CASE WHEN 语句:

      column = 'device_type' #column to replace
      
      e = f"""CASE {' '.join([f"WHEN {column}='{k}' THEN '{v}'" 
                   for k,v in deviceDict.items()])} ELSE {column} END"""
      
      df.withColumn(column,F.expr(e)).show()
      
      +-----------+
      |device_type|
      +-----------+
      |     Mobile|
      |     Mobile|
      |    Desktop|
      |      Other|
      |       null|
      +-----------+
      

      注意: 如果要返回键不匹配的NULL,只需在变量e的case语句中将ELSE {column} END更改为ELSE NULL END

      column = 'device_type' #column to replace
      
      e = f"""CASE {' '.join([f"WHEN {column}='{k}' THEN '{v}'" 
                   for k,v in deviceDict.items()])} ELSE NULL END"""
      
      df.withColumn('New_Col',F.expr(e)).show()
      
      +-----------+-------+
      |device_type|New_Col|
      +-----------+-------+
      |     Tablet| Mobile|
      |      Phone| Mobile|
      |         PC|Desktop|
      |      Other|   null|
      |       null|   null|
      +-----------+-------+
      

      【讨论】:

        猜你喜欢
        • 2021-12-29
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-08-21
        • 2020-01-21
        • 2022-08-17
        相关资源
        最近更新 更多