【问题标题】:Scala-Spark flattening nested schema contains arrayScala-Spark 扁平化嵌套模式包含数组
【发布时间】:2018-07-17 02:30:21
【问题描述】:

我有一个包含数组的嵌套模式:

 root
  |-- alarm_time: string (nullable = true)
  |-- alarm_id: string (nullable = true)
  |-- user: struct (nullable = true)
  |    |-- name: string (nullable = true)
  |    |-- family: string (nullable = true)
  |    |-- address: struct (nullable = true)
  |    |    |-- postalcode: string (nullable = true)
  |    |    |-- line1: string (nullable = true)
  |    |    |-- city: string (nullable = true)
  |    |    |-- country: string (nullable = true) 
  |-- device: struct (nullable = true)
  |    |-- device_usage: string (nullable = true)
  |    |-- device_id: string (nullable = true)  
  |-- alarm_info: struct (nullable = true)
  |    |-- type: string (nullable = true)
  |    |-- reason: string (nullable = true)
  |    |-- data: struct (nullable = true)
  |    |    |-- alarm_severity: long (nullable = true)
  |    |    |-- extra_info: array (nullable = true)
  |    |    |    |-- element: struct (containsNull = true)
  |    |    |    |    |-- producer: string (nullable = true)
  |    |    |    |    |-- comment: string (nullable = true)

我过去常常忽略数组字段并使用此代码来扁平化我的架构:

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colName = if (prefix == null) f.name else (prefix + "." + f.name)

    f.dataType match {
      case st: StructType => flattenSchema(st, colName)
      case _ => Array(col(colName))
    }
  })
}

并像df.select(flattenSchema(df.schema):_*) 一样使用它,但现在我有一个用例也需要保留数组数据,我唯一能想到的就是分解数组并保留多行但我没有运气。由于我将列作为 args 参数传递,因此我无法传递另一个参数。

我怎样才能做到这一点(使用展开数组的扁平化架构)?

【问题讨论】:

  • 所以你想在数组列上调用explode 并在同一个select 中展平结构?还是我误会了什么?
  • @Shaido 是的,你是对的,我想为数组的每个元素单独设置一列
  • 您想为每个元素设置单独的列或行吗?使用explode,您将在自己的行而不是列上获得元素。
  • @Shaido 我想为数组的每个元素获取单独的列(在爆炸数组之后)。我的意思是,如果我有一个包含两个数组的行,它应该为每个数组元素生成 2 行,但每个数组元素应该有单独的列(生产者,评论)
  • 我明白了,在这种情况下,你不能先爆炸数组,然后在新的数据帧上使用flattenSchema 吗?在这种情况下,数组已经被分解成多行,每行都包含一个结构(每个都有两个元素,生产者和注释),并且您的方法应该能够展平它。

标签: scala apache-spark apache-spark-sql


【解决方案1】:

所以我现在(Spark 2.2+)正在做的是检查架构是否嵌套并一遍又一遍地调用flattenschema,直到它变平。

  def makeItFlat(df: DataFrame): DataFrame = {
    if (isSchemaNested(df)) {
      val flattenedSchema = flattenSchema(df.schema)
      makeItFlat(df.select(flattenedSchema: _*))
    }
    else {
      df
    }
  }

makeItFlat() 是一种递归方法,用于检查模式是否未展平但再次递归调用 flattenschema

  def isSchemaNested(df: DataFrame): Boolean = {
    df.schema.fields.flatMap(field => {

      field.dataType match {
        case arrayType: ArrayType => {
          Array(true)
        }
        case mapType: MapType => {
          Array(true)
        }
        case structType: StructType => {
          Array(true)
        }
        case _ => {
          Array(false)
        }
      }
    }).exists(b => b)
  }

isSchemaNested 的工作是检查模式定义中是否存在任何嵌套数据类型

  private def flattenSchema(schema: StructType, prefix: String = null): Array[Column] = {
    schema.fields.flatMap(field => {
      val columnName = if (prefix == null) field.name else prefix + "." + field.name

      field.dataType match {
        case arrayType: ArrayType => {

          Array[Column](explode_outer(col(columnName)).as(columnName.replace(".", "_")))
        }
        case mapType: MapType => {
          None
        }
        case structType: StructType => {

          flattenSchema(structType, columnName)
        }
        case _ => {
          val columnNameWithUnderscores = columnName.replace(".", "_")

          val metadata = new MetadataBuilder().putString("encoding", "ZSTD").build()

          Array(col(columnName).as(columnNameWithUnderscores, metadata))
        }
      }
    }).filter(field => field != None)
  }

【讨论】:

    【解决方案2】:

    Am1rr3zA,如果我们在同一级别有两个数组,您提供的解决方案将会中断。它不允许同时发生两个爆炸:“每个选择子句只允许一个生成器,但发现了 2 个:explode(_1),explode(_2)”

    我已更新解决方案以跟踪嵌套中的复杂类型

      def flattenDataFrame(df: DataFrame): DataFrame = {
    
    var flattenedDf: DataFrame = df
    
    if (isNested(df)) {
    
      val flattenedSchema: Array[(Column, Boolean)] = flattenSchema(df.schema)
      var simpleColumns: List[Column] = List.empty[Column]
      var complexColumns: List[Column] = List.empty[Column]
    
      flattenedSchema.foreach {
        case (col, isComplex) => {
          if (isComplex) {
            complexColumns = complexColumns :+ col
          } else {
            simpleColumns = simpleColumns :+ col
          }
        }
      }
    
      var crossJoinedDataFrame = df.select(simpleColumns: _*)
      complexColumns.foreach(col => {
        crossJoinedDataFrame = crossJoinedDataFrame.crossJoin(df.select(col))
        crossJoinedDataFrame = flattenDataFrame(crossJoinedDataFrame)
      })
      crossJoinedDataFrame
    } else {
      flattenedDf
    }
    

    }

    private def flattenSchema(schema: StructType, prefix: String = null): Array[(Column, Boolean)] = {
    
    schema.fields.flatMap(field => {
    
      val columnName = if (prefix == null) field.name else prefix + "." + field.name
      field.dataType match {
        case arrayType: ArrayType => {
          val cols: Array[(Column, Boolean)] = Array[(Column, Boolean)](((explode_outer(col(columnName)).as(columnName.replace(".", "_"))), true))
          cols
        }
        case structType: StructType => {
          flattenSchema(structType, columnName)
        }
        case _ => {
          val columnNameWithUnderscores = columnName.replace(".", "_")
          val metadata = new MetadataBuilder().putString("encoding", "ZSTD").build()
          Array(((col(columnName).as(columnNameWithUnderscores, metadata)), false))
        }
      }
    }).filter(field => field != None)
    

    }

    def isNested(df: DataFrame): Boolean = {
    df.schema.fields.flatMap(field => {
    
      field.dataType match {
        case arrayType: ArrayType => {
          Array(true)
        }
        case mapType: MapType => {
          Array(true)
        }
        case structType: StructType => {
          Array(true)
        }
        case _ => {
          Array(false)
        }
      }
    }).exists(b => b)
    

    }

    【讨论】:

    • isNested 定义在哪里?
    • @PaulReiners,我只提到了我从原始解决方案中触及的部分。我现在在评论中添加了 isNested 定义...?
    • 这是一个很好的例子。真的有助于压平它。我的要求变化很小。如果有人可以为此提供建议。假设我有如下架构 { a |-b | |-c }。检查它-> a 是主标签,b 是标签,b 下,c 是标签。 c 有两条记录。使用此方法从一条记录中生成数据框中的两条记录。如果我需要一个记录,其中两个标签值由 ; 分隔那我该如何申请呢?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2023-04-08
    • 1970-01-01
    • 2017-03-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-08-08
    相关资源
    最近更新 更多