【问题标题】:Pyspark > Dataframe with multiple array columns into multiple rows with one value eachPyspark > Dataframe,将多个数组列分成多行,每行一个值
【发布时间】:2021-11-04 04:49:10
【问题描述】:

我们有一个 pyspark 数据框,其中有几列包含具有多个值的数组。我们的目标是将这些列的每个值放在几行中,并保留初始的不同列。 所以,从这样的开始:

data = [
    ("A", ["a", "c"], ["1", "5"]),
    ("B", ["a", "b"], None),
    ("C", [], ["1"]),
]

什么:

+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A  |[a, c]|[1, 5]|
|B  |[a, b]|null  |
|C  |[]    |[1]   |
+---+------+------+

我们希望最终拥有:

+---+----+----+
|id |col |col |
+---+----+----+
|A  |a   |null|
|A  |c   |null|
|A  |null|1   |
|A  |null|5   |
|B  |a   |null|
|B  |b   |null|
|C  |null|1   |
+---+----+----+

我们正在考虑几种方法:

  1. 为每个值加上一个列指示符,将所有数组合并为一个数组,将其分解并将不同的值重新组织到不同的列中
  2. 将数据框拆分为多个,每个包含这些数组列之一,分解数组列,然后连接数据框

但所有这些都像肮脏、复杂、容易出错和低效的解决方法。

有没有人知道如何以优雅的方式解决这个问题?

【问题讨论】:

    标签: python dataframe apache-spark pyspark apache-spark-sql


    【解决方案1】:

    试试这个动态解决方案。

    输入:

    data = [
        ("A", ["a", "c"], ["1", "5"]),
        ("B", ["a", "b"], None),
        ("C", [], ["1"]),
    ]
    df=spark.createDataFrame(data,["id","list_a","list_b"])
    df.show(truncate=False)
    +---+------+------+
    |id |list_a|list_b|
    +---+------+------+
    |A  |[a, c]|[1, 5]|
    |B  |[a, b]|null  |
    |C  |[]    |[1]   |
    +---+------+------+
    

    让我们为 df 中的每个数组列创建一个 Dataframes 数组。 首先用空 Dataframe 初始化,然后在 for 循环中覆盖它。 对于每一列,展开它,对于所有其他列,将数据类型更改为带 NULL 的字符串。

    from pyspark.sql.types import *
    array_cols=df.columns[1:]  #just ignoring the ID column
    c=0
    dfarr=[spark.createDataFrame([],schema=StructType()) for i in array_cols ]
    for i in array_cols:
        dfarr[c]=df.withColumn(i,explode(col(i)))
        for j in array_cols:
            if(i!=j):
                dfarr[c]=dfarr[c].withColumn(j,expr(" cast(null as string) "))
        c=c+1
    

    现在,dfarr 是一个数据帧数组,其架构类似于

    dfarr[0].printSchema()
    root
     |-- id: string (nullable = true)
     |-- list_a: string (nullable = true)
     |-- list_b: string (nullable = true)
    
    dfarr[1].show(truncate=False)
    +---+------+------+
    |id |list_a|list_b|
    +---+------+------+
    |A  |null  |1     |
    |A  |null  |5     |
    |C  |null  |1     |
    +---+------+------+
    

    现在 dfarr 中的数据类型都相似,所以只需将它们合并即可。为此,我们需要来自 functools 的 reduce 函数

    from functools import reduce  
    from pyspark.sql import DataFrame
    
    def unionAll(*dfs):
        return reduce(DataFrame.unionByName, dfs) 
    

    申请我们的dfarr

    combo=unionAll(*dfarr)
    
    combo.show(truncate=False)
    +---+------+------+
    |id |list_a|list_b|
    +---+------+------+
    |A  |a     |null  |
    |A  |c     |null  |
    |B  |a     |null  |
    |B  |b     |null  |
    |A  |null  |1     |
    |A  |null  |5     |
    |C  |null  |1     |
    +---+------+------+
    

    【讨论】:

      【解决方案2】:

      如果 list_a 和 list_b 列都可能为空,我会在数据集中添加第 4 个案例

      data = [
          ("A", ["a", "c"], ["1", "5"]),
          ("B", ["a", "b"], None),
          ("C", [], ["1"]),
          ("D", None, None),
      ]
      df = spark.createDataFrame(data,["id","list_a","list_b"])
      

      然后我会将原始 df 拆分为 3(空值,list_a 已分解,list_b 已分解)并执行 unionByName

      dfnulls = df.filter(col("list_a").isNull() & col("list_b").isNull())\
          .withColumn("list_a", lit(None))\
          .withColumn("list_b", lit(None))
      
      df1 = df\
          .withColumn("list_a", explode_outer(col("list_a")))\
          .withColumn("list_b", lit(None))\
          .filter(~col("list_a").isNull())
      
      df2 = df\
          .withColumn("list_b", explode_outer(col("list_b")))\
          .withColumn("list_a", lit(None))\
          .filter(~col("list_b").isNull())
      
      merged_df = df1.unionByName(df2).unionByName(dfnulls)
      
      merged_df.show()
      
      +---+------+------+
      | id|list_a|list_b|
      +---+------+------+
      |  A|     a|  null|
      |  A|     c|  null|
      |  B|     a|  null|
      |  B|     b|  null|
      |  A|  null|     1|
      |  A|  null|     5|
      |  C|  null|     1|
      |  D|  null|  null|
      +---+------+------+
      

      【讨论】:

        【解决方案3】:

        以下方法可能会对您有所帮助,它基于 Scala

        基本上单独分解各个列表列并根据虚拟列连接数据集以获得所需的结果。

        import org.apache.spark.sql.functions.{explode_outer, col, lit, concat}
        
        
        val df1 = inputDF
          .withColumn("list_a", explode_outer(col("list_a")))
          .withColumn("random_join_col", concat(col("id"), lit("1")))
          .drop("list_b")
        
        val df2 = inputDF
          .withColumn("list_b", explode_outer(col("list_b")))
          .withColumn("random_join_col", concat(col("id"), lit("2")))
          .drop("list_a")
        
        
        val finalDF = df1.join(df2, Seq("id", "random_join_col"), "full_outer").drop("random_join_col")
        
        // Drop rows, if it got null value on both the list columns
        finalDF.na.drop(how = "all", Seq("list_a","list_b")).orderBy("id").show(false)
        

        【讨论】:

          猜你喜欢
          • 2019-11-25
          • 2017-04-22
          • 1970-01-01
          • 2021-08-31
          • 2017-02-27
          • 2018-04-08
          • 1970-01-01
          相关资源
          最近更新 更多