【问题标题】:PySpark "explode" dict in columnPySpark“爆炸”列中的字典
【发布时间】:2018-11-13 23:37:47
【问题描述】:

我在 spark 数据框中有一列“true_recoms”:

-RECORD 17----------------------------------------------------------------- 
item        | 20380109                                                                                                                                                                  
true_recoms | {"5556867":1,"5801144":5,"7397596":21}          

我需要“分解”这个专栏来得到这样的东西:

item        | 20380109                                                                                                                                                                  
recom_item  | 5556867
recom_cnt   | 1
..............
item        | 20380109                                                                                                                                                                  
recom_item  | 5801144
recom_cnt   | 5
..............
item        | 20380109                                                                                                                                                                  
recom_item  | 7397596
recom_cnt   | 21

我尝试使用 from_json 但它不起作用:

    schema_json = StructType(fields=[
        StructField("item", StringType()),
        StructField("recoms", StringType())
    ])
    df.select(col("true_recoms"),from_json(col("true_recoms"), schema_json)).show(5)

+--------+--------------------+------+
|    item|         true_recoms|true_r|
+--------+--------------------+------+
|31746548|{"32731749":3,"31...|   [,]|
|17359322|{"17359392":1,"17...|   [,]|
|31480894|{"31480598":1,"31...|   [,]|
| 7265665|{"7265891":1,"503...|   [,]|
|31350949|{"32218698":1,"31...|   [,]|
+--------+--------------------+------+
only showing top 5 rows

【问题讨论】:

    标签: python apache-spark pyspark explode


    【解决方案1】:

    架构定义不正确。您声明为struct,带有两个字符串字段

    • item
    • recoms

    虽然文档中没有两个字段。

    不幸的是 from_json 只能返回结构或结构数组,因此将其重新定义为

    MapType(StringType(), LongType())
    

    不是一个选项。

    我个人会使用udf

    from pyspark.sql.functions import udf, explode
    import json
    
    @udf("map<string, bigint>")
    def parse(s):
        try:
            return json.loads(s)
        except json.JSONDecodeError:
            pass 
    

    可以这样应用

    df = spark.createDataFrame(
        [(31746548, """{"5556867":1,"5801144":5,"7397596":21}""")],
        ("item", "true_recoms")
    )
    
    df.select("item",  explode(parse("true_recoms")).alias("recom_item", "recom_cnt")).show()
    # +--------+----------+---------+
    # |    item|recom_item|recom_cnt|
    # +--------+----------+---------+
    # |31746548|   5801144|        5|
    # |31746548|   7397596|       21|
    # |31746548|   5556867|        1|
    # +--------+----------+---------+
    

    【讨论】:

      猜你喜欢
      • 2021-11-09
      • 2016-11-07
      • 1970-01-01
      • 2020-12-10
      • 1970-01-01
      • 2022-06-14
      • 2023-04-06
      • 1970-01-01
      • 2021-12-19
      相关资源
      最近更新 更多