【问题标题】:Automatically and Elegantly flatten DataFrame in Spark SQL在 Spark SQL 中自动优雅地展平 DataFrame
【发布时间】:2016-09-25 01:51:12
【问题描述】:

全部,

是否有一种优雅且可接受的方式来展平具有嵌套 StructType 列的 Spark SQL 表(Parquet)

例如

如果我的架构是:

foo
 |_bar
 |_baz
x
y
z

如何在不求助于手动运行的情况下将其选择为扁平表格形式

df.select("foo.bar","foo.baz","x","y","z")

换句话说,我如何以编程方式仅给出StructTypeDataFrame 获得上述代码的结果

【问题讨论】:

  • 您是否尝试过使用explode DataFrame 方法?
  • 不要认为explode 会这样做。 explode 创建新行——他想添加列。我认为您需要使用 Column 对象。
  • 对不起,我错了。
  • 我的意思是,我确信我可以使用explode 来做到这一点——explode 实际上确实可以让您创建新列。我只是不认为它会很优雅——您可能必须为每条记录进行架构反射,而不是预先加载架构反射来只执行一次以创建 select(...)
  • 直接从databricks解决:github.com/delta-io/delta/blob/…

标签: scala apache-spark apache-spark-sql


【解决方案1】:

简短的回答是,没有“公认”的方法可以做到这一点,但您可以通过遍历DataFrame.schema 生成select(...) 语句的递归函数非常优雅地做到这一点。

递归函数应该返回一个Array[Column]。每次函数遇到StructType 时,它都会调用自己并将返回的Array[Column] 附加到自己的Array[Column] 中。

类似:

import org.apache.spark.sql.Column
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions.col

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colName = if (prefix == null) f.name else (prefix + "." + f.name)

    f.dataType match {
      case st: StructType => flattenSchema(st, colName)
      case _ => Array(col(colName))
    }
  })
}

然后你会像这样使用它:

df.select(flattenSchema(df.schema):_*)

【讨论】:

  • 谢谢,这似乎是一个非常合理的解决方案。
  • 使用此解决方案,我如何处理具有相同名称的最低级别子节点?例如,父元素 Foo 有子元素 Bar,父元素 Foz 也有一个单独的子元素,名为 Bar。从初始数据框中选择 Foo.Bar 和 Foz.Bar 时(数组由 flattenSchema 返回),我得到 2 列都命名为 Bar。但我希望列标题为Foo.BarFoo_Bar 或类似的东西。因此,它们中的每一个都是独一无二且明确的。
  • 上述方案适用于什么版本的Spark?在 Spark 2.1.0 (Java API) 中,看起来 StructField 的类型不可能是 StructType。
  • TheM00s3,你可以导入 org.apache.spark.sql.functions.col 它也应该在 Spark 2.1.x 中工作(目前只在 Scala 中尝试过,而不是 Java)
  • 以防万一其他人偶然发现:如果您希望新列名反映原始模式的嵌套结构:f1.nested1.nested2 ... 您应该在此行为列设置别名:case _ => Array(col(colName)) 应该变成case _ => Array(col(colName).alias(colName))
【解决方案2】:

只是想分享我的 Pyspark 解决方案 - 它或多或少是对 @David Griffin 解决方案的翻译,因此它支持任何级别的嵌套对象。

from pyspark.sql.types import StructType, ArrayType  

def flatten(schema, prefix=None):
    fields = []
    for field in schema.fields:
        name = prefix + '.' + field.name if prefix else field.name
        dtype = field.dataType
        if isinstance(dtype, ArrayType):
            dtype = dtype.elementType

        if isinstance(dtype, StructType):
            fields += flatten(dtype, prefix=name)
        else:
            fields.append(name)

    return fields


df.select(flatten(df.schema)).show()

【讨论】:

  • 我遇到了一个错误,可能是由于 JSON 模式嵌套严重,但我不完全确定这意味着什么:“无法解析 'item.productOrService.coding[ 'code']' 由于数据类型不匹配:参数 2 需要整数类型,但是,''code'' 是字符串类型。”有任何想法吗?我对 JSON 完全陌生,但我怀疑结构中的数组是个问题。
  • @user1983682 请打开您的案例,以便我们查看您的schema 的详细信息。
【解决方案3】:

我正在改进我之前的答案,并为我自己在已接受答案的 cmets 中陈述的问题提供解决方案。

这个公认的解决方案创建一个 Column 对象数组并使用它来选择这些列。在 Spark 中,如果你有一个嵌套的 DataFrame,你可以像这样选择子列:df.select("Parent.Child"),这会返回一个包含子列值的 DataFrame,并命名为 Child。但是,如果您对不同父结构的属性具有相同的名称,则您会丢失有关父结构的信息,并且最终可能会得到相同的列名,并且不能再按名称访问它们,因为它们是明确的。

这是我的问题。

我找到了解决问题的方法,也许它也可以帮助其他人。我分别打电话给flattenSchema

val flattenedSchema = flattenSchema(df.schema)

这会返回一个 Column 对象数组。我没有在select() 中使用它,这将返回一个DataFrame,其中的列由最后一级的子级命名,我将原始列名作为字符串映射到它们自己,然后在选择Parent.Child 列后,将其重命名为@ 987654327@ 而不是 Child(为了方便,我还用下划线替换了点):

val renamedCols = flattenedSchema.map(name => col(name.toString()).as(name.toString().replace(".","_")))

然后您可以使用原始答案中所示的选择功能:

var newDf = df.select(renamedCols:_*)

【讨论】:

  • 嗨@V。萨玛,这个解决方案很棒。但是,如果子属性具有相同的名称并且来自单个父属性,代码将是什么。 ?例如{“面糊”:[{“id”:“1001”,“类型”:“常规”},{“id”:“1002”,“类型”:“巧克力”},{“id”:“1003” , "type": "蓝莓" }, { "id": "1004", "type": "魔鬼的食物" } } }
  • @vsdaking 嗨,谢谢。我确定我已经解决了您需要来自 JSON 数组的数据的问题。不幸的是,一段时间过去了,我目前也无法访问 Spark 进行测试。具有相同名称的子属性不是问题,因为我认为它们需要是您最终 DF 的列名。您只需搜索如何在 Spark 中读取 JSON 数组。也许explode 命令会帮助你。
  • 感谢@V.Samma 我已经用它来解决我的问题,但是它创建了一个非常宽的数据框,我实际上需要我的嵌套结构类型作为我的数据框中的新行。对此的任何建议将不胜感激
  • @ukbaz 当然,它采用所有嵌套的子属性并将它们按模式展平,这实际上意味着它们现在是数据框的单独列。这就是我解决方案的目标。我很难理解你到底需要什么。如果您有列 IDPersonAddress 但架构类似于:“ID”、“Person.Name”、“Person.Age”、“Address.City”、“Address.Street”、“Address .Country”,然后通过展平,最初的 3 列创建 6 列。根据我的例子,你想要什么结果?
  • @mythic 不幸的是,我已经有几年没有与 Spark 合作了,但您似乎已经得到了帮助 :)
【解决方案4】:

==========编辑====

这里有一些针对更复杂模式的额外处理:https://medium.com/@lvhuyen/working-with-spark-dataframe-having-a-complex-schema-a3bce8c3f44

===================

PySpark,添加到@Evan V 的答案中,当您的字段名称具有特殊字符时,例如点“.”、连字符“-”、...:

from pyspark.sql.types import StructType, ArrayType  

def normalise_field(raw):
    return raw.strip().lower() \
            .replace('`', '') \
            .replace('-', '_') \
            .replace(' ', '_') \
            .strip('_')

def flatten(schema, prefix=None):
    fields = []
    for field in schema.fields:
        name = "%s.`%s`" % (prefix, field.name) if prefix else "`%s`" % field.name
        dtype = field.dataType
        if isinstance(dtype, ArrayType):
            dtype = dtype.elementType
        if isinstance(dtype, StructType):
            fields += flatten(dtype, prefix=name)
        else:
            fields.append(col(name).alias(normalise_field(name)))

    return fields

df.select(flatten(df.schema)).show()

【讨论】:

    【解决方案5】:

    您还可以使用 SQL 将列选择为平面。

    1. 获取原始数据框架构
    2. 通过浏览架构生成 SQL 字符串
    3. 查询您的原始数据框

    我用 Java 做了一个实现:https://gist.github.com/ebuildy/3de0e2855498e5358e4eed1a4f72ea48

    (也可以使用递归方式,我更喜欢SQL方式,所以你可以通过Spark-shell轻松测试)。

    【讨论】:

      【解决方案6】:

      我在开源spark-daria project 中添加了一个DataFrame#flattenSchema 方法。

      下面介绍了如何在代码中使用该函数。

      import com.github.mrpowers.spark.daria.sql.DataFrameExt._
      df.flattenSchema().show()
      
      +-------+-------+---------+----+---+
      |foo.bar|foo.baz|        x|   y|  z|
      +-------+-------+---------+----+---+
      |   this|     is|something|cool| ;)|
      +-------+-------+---------+----+---+
      

      您还可以使用flattenSchema() 方法指定不同的列名分隔符。

      df.flattenSchema(delimiter = "_").show()
      +-------+-------+---------+----+---+
      |foo_bar|foo_baz|        x|   y|  z|
      +-------+-------+---------+----+---+
      |   this|     is|something|cool| ;)|
      +-------+-------+---------+----+---+
      

      这个分隔符参数非常重要。如果您将架构展平以在 Redshift 中加载表,您将无法使用句点作为分隔符。

      这是生成此输出的完整代码 sn-p。

      val data = Seq(
        Row(Row("this", "is"), "something", "cool", ";)")
      )
      
      val schema = StructType(
        Seq(
          StructField(
            "foo",
            StructType(
              Seq(
                StructField("bar", StringType, true),
                StructField("baz", StringType, true)
              )
            ),
            true
          ),
          StructField("x", StringType, true),
          StructField("y", StringType, true),
          StructField("z", StringType, true)
        )
      )
      
      val df = spark.createDataFrame(
        spark.sparkContext.parallelize(data),
        StructType(schema)
      )
      
      df.flattenSchema().show()
      

      底层代码类似于 David Griffin 的代码(以防您不想将 spark-daria 依赖项添加到项目中)。

      object StructTypeHelpers {
      
        def flattenSchema(schema: StructType, delimiter: String = ".", prefix: String = null): Array[Column] = {
          schema.fields.flatMap(structField => {
            val codeColName = if (prefix == null) structField.name else prefix + "." + structField.name
            val colName = if (prefix == null) structField.name else prefix + delimiter + structField.name
      
            structField.dataType match {
              case st: StructType => flattenSchema(schema = st, delimiter = delimiter, prefix = colName)
              case _ => Array(col(codeColName).alias(colName))
            }
          })
        }
      
      }
      
      object DataFrameExt {
      
        implicit class DataFrameMethods(df: DataFrame) {
      
          def flattenSchema(delimiter: String = ".", prefix: String = null): DataFrame = {
            df.select(
              StructTypeHelpers.flattenSchema(df.schema, delimiter, prefix): _*
            )
          }
      
        }
      
      }
      

      【讨论】:

      • 我们可以添加对 Array 和 Array 的支持吗?
      【解决方案7】:

      这是一个函数,它可以做你想做的事情,它可以处理多个嵌套列,其中包含具有相同名称的列,带有前缀:

      from pyspark.sql import functions as F
      
      def flatten_df(nested_df):
          flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
          nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
      
          flat_df = nested_df.select(flat_cols +
                                     [F.col(nc+'.'+c).alias(nc+'_'+c)
                                      for nc in nested_cols
                                      for c in nested_df.select(nc+'.*').columns])
          return flat_df
      

      之前:

      root
       |-- x: string (nullable = true)
       |-- y: string (nullable = true)
       |-- foo: struct (nullable = true)
       |    |-- a: float (nullable = true)
       |    |-- b: float (nullable = true)
       |    |-- c: integer (nullable = true)
       |-- bar: struct (nullable = true)
       |    |-- a: float (nullable = true)
       |    |-- b: float (nullable = true)
       |    |-- c: integer (nullable = true)
      

      之后:

      root
       |-- x: string (nullable = true)
       |-- y: string (nullable = true)
       |-- foo_a: float (nullable = true)
       |-- foo_b: float (nullable = true)
       |-- foo_c: integer (nullable = true)
       |-- bar_a: float (nullable = true)
       |-- bar_b: float (nullable = true)
       |-- bar_c: integer (nullable = true)
      

      【讨论】:

        【解决方案8】:

        要结合 David Griffen 和 V. Samma 的答案,您可以这样做以展平,同时避免重复的列名:

        import org.apache.spark.sql.types.StructType
        import org.apache.spark.sql.Column
        import org.apache.spark.sql.DataFrame
        
        def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
          schema.fields.flatMap(f => {
            val colName = if (prefix == null) f.name else (prefix + "." + f.name)
            f.dataType match {
              case st: StructType => flattenSchema(st, colName)
              case _ => Array(col(colName).as(colName.replace(".","_")))
            }
          })
        }
        
        def flattenDataFrame(df:DataFrame): DataFrame = {
            df.select(flattenSchema(df.schema):_*)
        }
        
        var my_flattened_json_table = flattenDataFrame(my_json_table)
        

        【讨论】:

          【解决方案9】:

          如果您使用的是嵌套结构和数组,请对上面的代码做一点补充。

          def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
              schema.fields.flatMap(f => {
                val colName = if (prefix == null) f.name else (prefix + "." + f.name)
          
                f match {
                  case StructField(_, struct:StructType, _, _) => flattenSchema(struct, colName)
                  case StructField(_, ArrayType(x :StructType, _), _, _) => flattenSchema(x, colName)
                  case StructField(_, ArrayType(_, _), _, _) => Array(col(colName))
                  case _ => Array(col(colName))
                }
              })
            }
          
          

          【讨论】:

          • 我正在尝试将此逻辑实现到 Evan V 给出的 spark 建议中,但似乎无法为 Array 类型中的 Struct 获取正确的代码——如果有人有想法,我将不胜感激。
          • 我们可以在展平架构的同时添加扫描深度吗?
          • 我正在尝试使用它,但它没有提供正确的输入。我有 a、a.b、a.b.c、a.b.d 但它没有对最后一个子级别进行展平
          【解决方案10】:

          我一直在使用一个内衬,这会产生一个扁平的架构,其中包含 5 列 bar、baz、x、y、z:

          df.select("foo.*", "x", "y", "z")
          

          至于explode:我通常保留explode 用于展平列表。例如,如果您有一列 idList 是字符串列表,您可以这样做:

          df.withColumn("flattenedId", functions.explode(col("idList")))
            .drop("idList")
          

          这将产生一个新的 Dataframe,其中包含一个名为 flattenedId 的列(不再是列表)

          【讨论】:

            【解决方案11】:

            这是对解决方案的修改,但它使用 tailrec 表示法

            
              @tailrec
              def flattenSchema(
                  splitter: String,
                  fields: List[(StructField, String)],
                  acc: Seq[Column]): Seq[Column] = {
                fields match {
                  case (field, prefix) :: tail if field.dataType.isInstanceOf[StructType] =>
                    val newPrefix = s"$prefix${field.name}."
                    val newFields = field.dataType.asInstanceOf[StructType].fields.map((_, newPrefix)).toList
                    flattenSchema(splitter, tail ++ newFields, acc)
            
                  case (field, prefix) :: tail =>
                    val colName = s"$prefix${field.name}"
                    val newCol  = col(colName).as(colName.replace(".", splitter))
                    flattenSchema(splitter, tail, acc :+ newCol)
            
                  case _ => acc
                }
              }
              def flattenDataFrame(df: DataFrame): DataFrame = {
                val fields = df.schema.fields.map((_, ""))
                df.select(flattenSchema("__", fields.toList, Seq.empty): _*)
              }
            

            【讨论】:

              【解决方案12】:

              这是基于@Evan V 处理嵌套更重的Json 文件的解决方案。 对我来说,原始解决方案的问题是当有一个 ArrayType 嵌套在另一个 ArrayType 中时,我得到了一个错误。

              例如,如果 Json 看起来像:

              {"e":[{"f":[{"g":"h"}]}]}
              

              我会得到一个错误:

              "cannot resolve '`e`.`f`['g']' due to data type mismatch: argument 2 requires integral type
              

              为了解决这个问题,我稍微修改了代码,我同意这看起来非常愚蠢,只是在这里发布它,以便有人可以提出更好的解决方案。

              def flatten(schema, prefix=None):
                  fields = []
                  for field in schema.fields:
                      name = prefix + '.' + field.name if prefix else field.name
                      dtype = field.dataType
                      if isinstance(dtype, T.StructType):
                          fields += flatten(dtype, prefix=name)
                      else:
                          fields.append(name)
              
                  return fields
              
              
              def explodeDF(df):
                  for (name, dtype) in df.dtypes:
                      if "array" in dtype:
                          df = df.withColumn(name, F.explode(name))
              
                  return df
              
              def df_is_flat(df):
                  for (_, dtype) in df.dtypes:
                      if ("array" in dtype) or ("struct" in dtype):
                          return False
              
                  return True
              
              def flatJson(jdf):
                  keepGoing = True
                  while(keepGoing):
                      fields = flatten(jdf.schema)
                      new_fields = [item.replace(".", "_") for item in fields]
                      jdf = jdf.select(fields).toDF(*new_fields)
                      jdf = explodeDF(jdf)
                      if df_is_flat(jdf):
                          keepGoing = False
              
                  return jdf
              

              用法:

              df = spark.read.json(path_to_json)
              flat_df = flatJson(df)
              
              flat_df.show()
              +---+---+-----+
              |  a|e_c|e_f_g|
              +---+---+-----+
              |  b|  d|    h|
              +---+---+-----+
              

              【讨论】:

                【解决方案13】:
                import org.apache.spark.sql.SparkSession
                import org.apache.spark.SparkConf
                import org.apache.spark.sql.types.StructType
                import scala.collection.mutable.ListBuffer 
                val columns=new ListBuffer[String]()
                
                def flattenSchema(schema:StructType,prefix:String=null){
                for(i<-schema.fields){
                  if(i.dataType.isInstanceOf[StructType]) {
                    val columnPrefix = i.name + "."
                    flattenSchema(i.dataType.asInstanceOf[StructType], columnPrefix)
                  }
                  else {
                    if(prefix == null)
                      columns.+=(i.name)
                    else
                      columns.+=(prefix+i.name)
                  }
                  }
                }
                

                【讨论】:

                  【解决方案14】:

                  结合了 Evan V、Avrell 和 Steco 的想法。 在 PySpark 中使用 '`' 处理带有特殊字符的查询字段时,我还提供了完整的 SQL 语法。

                  下面的解决方案给出了以下,

                  1. 处理嵌套的 JSON 架构。
                  2. 跨嵌套列处理相同的列名(我们将给出整个层次结构的别名,用下划线分隔)。
                  3. 处理特殊字符。 (我们用 '' 处理特殊字符,我没有处理连续出现的 '' 但我们也可以通过适当的 'sub' 替换来处理)
                  4. 为我们提供 SQL 语法。
                  5. 查询字段包含在“`”中。

                  代码sn-p如下,

                  df=spark.read.json('<JSON FOLDER / FILE PATH>')
                  df.printSchema()
                  from pyspark.sql.types import StructType, ArrayType
                  
                  def flatten(schema, prefix=None):
                      fields = []
                      for field in schema.fields:
                          name = prefix + '.' + field.name if prefix else field.name
                          dtype = field.dataType
                          if isinstance(dtype, ArrayType):
                              dtype = dtype.elementType
                          
                          if isinstance(dtype, StructType):
                              fields += flatten(dtype, prefix=name)
                          else:
                              alias_name=name.replace('.','_').replace(' ','_').replace('(','').replace(')','').replace('-','_').replace('&','_').replace(r'(_){2,}',r'\1')
                              name=name.replace('.','`.`')
                              field_name = "`" + name + "`" + " AS " + alias_name
                              fields.append(field_name)
                      return fields
                  
                  df.createOrReplaceTempView("to_flatten_df")
                  query_fields=flatten(df.schema)
                  
                  def listToString(s):  
                      
                      # initialize an empty string 
                      str1 = ""
                      # traverse in the string   
                      for ele in s:  
                          str1 = str1 + ele + ','
                      # return string   
                      return str1  
                  
                  spark.sql("SELECT " + listToString(query_fields)[:-1] + " FROM to_flatten_df" ).show()
                  

                  【讨论】:

                    猜你喜欢
                    • 1970-01-01
                    • 2016-05-03
                    • 1970-01-01
                    • 1970-01-01
                    • 2019-04-01
                    • 1970-01-01
                    • 1970-01-01
                    • 2019-04-06
                    • 2019-10-08
                    相关资源
                    最近更新 更多