【问题标题】:pyspark dataframe json column decomposepyspark数据框json列分解
【发布时间】:2020-11-24 01:43:52
【问题描述】:

我正在尝试分解 pyspark 数据框中的 json 列。

pyspark dataframe with json column to aggregate the json elements into a new column and remove duplicated的问题类似

但是这个新的 json 列有更复杂的结构。

数据框

 year month id json_col
 2010  08   5  {"my_p": [{"like": false, "p_id": "dfvefvsd"}, {"like": true, "p_id": "dvcdc"}], "p_id": "cdscas"} 

我需要一个新的 col:

year month id like  p_id
2010  8    5  false dfvefvsd
2010  8    5  true  dvcdc
2010  8    5  null  cdscas

如果在同一年、月、id中有重复的p_id,删除它。

从上面链接中学到的代码(感谢@Shu)

from pyspark.sql import functions as F
from pyspark.sql.types import *

t = spark.sql('select * from my_db.my_tab')

schema = ArrayType(
                StructType(
                  [
                    StructField('my_p', 
                                        StructType(
                                                  [StructField('p_id', StringType(), True),
                                                  StructField('like', BooleanType(), True)
                                                  ]
                                        ),
                               True), 
                   StructField('p_id', StringType(), True)
                  ]
                  
                )
            )

   t1 = t.withColumn('a_col', F.from_json('json_col', schema)).select('year', 'month', 'id', 'p_id', F.expr('transform(json_col, f -> f.p_id)').alias('tmp'))

   t1.groupBy("year","month", 'id', 'p_id').agg(F.to_json(F.array_distinct(F.flatten(F.collect_list(F.col("tmp"))))).alias("new_col")).show(10,False)

但是,只有第一个“p_id”是从 json_col 中分解出来的。

谢谢

【问题讨论】:

  • 你能发布可重现的输入数据框吗?

标签: sql dataframe pyspark


【解决方案1】:

试试这个

t.show()

#+----+-----+---+--------------------------------------------------------------------------------------------------+
#|year|month|id |json_col                                                                                          |
#+----+-----+---+--------------------------------------------------------------------------------------------------+
#|2010|08   |5  |{"my_p": [{"like": false, "p_id": "dfvefvsd"}, {"like": true, "p_id": "dvcdc"}], "p_id": "cdscas"}|
#+----+-----+---+--------------------------------------------------------------------------------------------------+


from pyspark.sql import functions as F
from pyspark.sql.window import Window

schema1='struct<my_p:array<struct<like:boolean,p_id:string>>,p_id:string>'

w=Window().partitionBy("p_id2").orderBy(F.lit(0))


t.withColumn("json_col", F.from_json("json_col",schema1))\
  .select("*","json_col.*").drop("json_col")\
  .withColumnRenamed("p_id","p_id2").select("*",F.expr("""inline(my_p)""")).drop("my_p")\
  .withColumn('num', F.row_number().over(w)).withColumn("p_id", F.when(F.col("num")==1, F.array("p_id2","p_id"))\
                                                                .otherwise(F.array("p_id"))).drop("num","p_id2")\
  .withColumn("p_id", F.explode("p_id")).show()

#+----+-----+---+-----+--------+
#|year|month| id| like|    p_id|
#+----+-----+---+-----+--------+
#|2010|   08|  5| true|   dvcdc|
#|2010|   08|  5|false|dfvefvsd|
#|2010|   08|  5|false|  cdscas|
#+----+-----+---+-----+--------+

【讨论】:

  • 谢谢,它有效。是否可以将“p_id2”的“like”值标记为“null”? json 列中的最后一个“p_id”对于“like”是“unknown”,所以我们更愿意将其标记为“null”,谢谢!
猜你喜欢
  • 1970-01-01
  • 2023-01-03
  • 1970-01-01
  • 2018-12-07
  • 2020-11-17
  • 2020-12-28
  • 2023-03-05
  • 1970-01-01
  • 2018-12-06
相关资源
最近更新 更多