【问题标题】:Convert Pyspark Dataframe column from array to new columns将 Pyspark Dataframe 列从数组转换为新列
【发布时间】:2017-12-18 18:04:23
【问题描述】:

我有一个具有这种结构的 Pyspark 数据框:

root
 |-- Id: string (nullable = true)
 |-- Q: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- pr: string (nullable = true)
 |    |    |-- qt: double (nullable = true)

类似于:

 +----+--------------------- ... --+
 | Id |           Q                |
 +----+---------------------- ... -+
 | 001| [ [pr1,1.9], [pr3,2.0]...] |
 | 002| [ [pr2,1.0], [pr9,3.9]...] |
 | 003| [ [pr2,9.0], ...         ] |
  ...

我希望将 Q 数组转换为列(名称 pr 值 qt)。 另外我想通过合并(添加)相同的列来避免重复的列。

 +----+-----+-----+------+ ... ----+
 | Id | pr1 | pr2 | pr3  | ... prn |
 +----+-----+-----+------+ ... ----+
 | 001| 1.9 | 0.0 | 2.0  | ...     |
 | 002| 0.0 | 1.0 | 0    | ...     |
 | 003| 0.0 | 9.0 | ...  | ...     |
  ...

我怎样才能执行这种转换? 提前谢谢你!! 朱利安。

【问题讨论】:

  • 嗨,如果答案有效或者您还有其他问题,请告诉我,谢谢
  • 是的 ags29,谢谢!!!

标签: dataframe pyspark


【解决方案1】:

您可以结合使用explodepivot

import pyspark.sql.functions as F

# explode to get "long" format
df=df.withColumn('exploded', F.explode('Q'))

# get the name and the name in separate columns
df=df.withColumn('name', F.col('exploded').getItem(0))
df=df.withColumn('value', F.col('exploded').getItem(1))

# now pivot
df.groupby('Id').pivot('name').agg(F.max('value')).na.fill(0)

【讨论】:

    【解决方案2】:

    非常有趣的问题。我就是这样处理的。

    test.csv

    001,pr1:0.9,pr3:1.2,pr2:2.0
    002,pr3:5.2,pr4:0.99
    

    派斯帕克

    file = sc.textFile("file:///test2.csv")
    
    //get it in (key,value)
    //[(u'001', u'pr1:0.9')...]
    
    //rdd1 = file.map(lambda r: r.replace(",","\t",1)).map(lambda r: r.split("\t")).map(lambda r: (r[0],r[1])).flatMapValues(lambda r: r.split(','))
    rdd1 = file.map(lambda r: r.split(",")[0]).map(lambda r: (r[0],r[1])).flatMapValues(lambda r: r.split(','))
    
    //create a DF with 3 columns
    //[(u'001', u'pr1', u'0.9')...)]
    +---+---+----+
    | _1| _2|  _3|
    +---+---+----+
    |001|pr1| 0.9|
    |001|pr3| 1.2|
    |001|pr2| 2.0|
    |002|pr3| 5.2|
    |002|pr4|0.99|
    +---+---+----+
    
    
    rdd2 = rdd1.map(lambda r: (r[0],r[1].split(":"))).map(lambda r: (r[0],r[1][0],r[1][1]))
    df = rdd2.toDF()
    
    
    //Perform the magic
    df.groupBy("_1").pivot("_2").agg(expr("coalesce(first(_3),0)"))
    
    
    +---+---+---+---+----+
    | _1|pr1|pr2|pr3| pr4|
    +---+---+---+---+----+
    |001|0.9|2.0|1.2|   0|
    |002|  0|  0|5.2|0.99|
    +---+---+---+---+----+
    

    【讨论】:

    • 谢谢巴拉,这是一个很好的解决方案。可能比 ags29 建议的时间长一点。
    猜你喜欢
    • 1970-01-01
    • 2019-08-05
    • 2022-09-24
    • 1970-01-01
    • 2022-11-12
    • 1970-01-01
    • 2021-09-22
    • 2019-03-02
    • 2019-12-17
    相关资源
    最近更新 更多