【问题标题】:PySpark: create column based on value and dictionary in columnsPySpark:根据列中的值和字典创建列
【发布时间】:2022-11-29 17:19:45
【问题描述】:

我有一个 PySpark 数据框,其中包含值和为值提供文本映射的字典。 并非每一行都有相同的字典,值也可能不同。

| value    | dict                                           | 
| -------- | ---------------------------------------------- |
| 1        | {"1": "Text A", "2": "Text B"}                 |
| 2        | {"1": "Text A", "2": "Text B"}                 |
| 0        | {"0": "Another text A", "1": "Another text B"} |

我想创建一个包含正确映射的“状态”列。


| value    | dict                             | status   |
| -------- | -------------------------------  | -------- |
| 1        | {"1": "Text A", "2": "Text B"}   | Text A   |
| 2        | {"1": "Text A", "2": "Text B"}   | Text B   |
| 0        | {"0": "Other A", "1": "Other B"} | Other A  |

我试过这段代码:

df.withColumn("status", F.col("dict").getItem(F.col("value"))

此代码不起作用。使用硬编码值,如“2”,相同的代码确实提供了输出,但当然不是正确的输出:

df.withColumn("status", F.col("dict").getItem("2"))

有人可以帮助我在状态列中获得正确的映射值吗?

编辑:我的代码确实有效,除了我的“值”是双精度值并且 dict 中的键是字符串。将列从 double 转换为 int 到 string 时,代码有效。

【问题讨论】:

    标签: python dictionary pyspark apache-spark-sql mapping


    【解决方案1】:

    这是我的 2 美分

    1. 通过从 CSV 或任何其他来源读取来创建数据框(在我的例子中它只是静态数据)

       from pyspark.sql.types import *
      
       data = [
       (1 , {"1": "Text A", "2": "Text B"}),
       (2 , {"1": "Text A", "2": "Text B"}),
       (0 , {"0": "Another text A", "1": "Another text B"} )
       ]
      
      
       schema = StructType([
                           StructField("ID",StringType(),True),
                           StructField("Dictionary",MapType(StringType(),StringType()),True),
                           ])
      
       df = spark.createDataFrame(data,schema=schema)
       df.show(truncate=False)
      
    2. 然后直接以id为key提取字典值。

      df.withColumn('extract',df.Dictionary[df.ID]).show(truncate=False)
      

      查看下图以供参考:

    【讨论】:

    • 谢谢您的回答!这确实有效。评估后,我发现我的原始代码也应该可以工作。我的问题是值(在你的例子中是 ID)是一个双精度值,在映射中它是一个字符串。
    【解决方案2】:

    希望这可以帮助。

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    import json
    
    
    if __name__ == '__main__':
        spark = SparkSession.builder.appName('Medium').master('local[1]').getOrCreate()
        df = spark.read.format('csv').option("header","true").option("delimiter","|").load("/Users/dshanmugam/Desktop/ss.csv")
        schema = StructType([
            StructField("1", StringType(), True)
        ])
    
    
        def return_value(data):
            key = data.split('-')[0]
            value = json.loads(data.split('-')[1])[key]
            return value
    
        returnVal = udf(return_value)
        df_new = df.withColumn("newCol",concat_ws("-",col("value"),col("dict"))).withColumn("result",returnVal(col("newCol")))
        df_new.select(["value","result"]).show(10,False)
    

    结果:

    +-----+--------------+
    |value|result        |
    +-----+--------------+
    |1    |Text A        |
    |2    |Text B        |
    |0    |Another text A|
    +-----+--------------+
    

    我正在使用 UDF。如果性能是一个问题,您可以尝试其他一些选项。

    【讨论】:

    • 非常感谢您的回答。不幸的是,性能是一个问题,所以 UDF 不是我的解决方案。
    • 哦好的!!您可以尝试使用 Pandas UDF 进行矢量化,以提高性能。
    猜你喜欢
    • 2022-01-24
    • 2017-04-10
    • 1970-01-01
    • 2019-02-09
    • 1970-01-01
    • 2022-12-21
    • 1970-01-01
    • 1970-01-01
    • 2018-08-25
    相关资源
    最近更新 更多