【问题标题】:Spark: Programatic schema dynamic column mappingSpark:程序模式动态列映射
【发布时间】:2017-03-29 12:17:16
【问题描述】:

我正在尝试进一步了解 sparkSQLexamample runProgramaticSchemaExample 并且无法处理动态列数。请参阅此代码,其中唯一的更改是在 for 循环中为 Row 指定列映射。

  private def runProgrammaticSchemaExample(spark: SparkSession): Unit = {
    import spark.implicits._
    // $example on:programmatic_schema$
    // Create an RDD
    val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

    // The schema is encoded in a string
    val schemaString = "name age"

    // Generate the schema based on the string of schema
    val fields = schemaString.split(" ")
      .map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)

    // Convert records of the RDD (people) to Rows
    val rowRDD = peopleRDD
      .map(_.split(","))
     //       .map(attributes => Row(attributes(0), attributes(1).trim))
      .map(attributes => Row(for (i <- 0 to (attributes.length -1 )){attributes(i)}))

    // Apply the schema to the RDD
    val peopleDF = spark.createDataFrame(rowRDD, schema)

    // Creates a temporary view using the DataFrame
    peopleDF.createOrReplaceTempView("people")
    peopleDF.printSchema()
    // SQL can be run over a temporary view created using DataFrames
    val results = spark.sql("SELECT name FROM people")

    // The results of SQL queries are DataFrames and support all the normal RDD operations
    // The columns of a row in the result can be accessed by field index or by field name
    results.map(attributes => "Name: " + attributes(0)).show()
    // +-------------+
    // |        value|
    // +-------------+
    // |Name: Michael|
    // |   Name: Andy|
    // | Name: Justin|
    // +-------------+
    // $example off:programmatic_schema$
  }
}

这是我得到的错误

16/11/15 09:31:06 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: scala.runtime.BoxedUnit is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) AS name#0
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 0
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name)
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) AS age#1
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 1
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age)
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]

    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:279)
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537)
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

【问题讨论】:

    标签: apache-spark dataframe schema row


    【解决方案1】:

    Daniel Allen 的回答很好,但我发现它有些问题,我对其进行了修改并且它有效:

    val fieldCount = schemaString.split(" ").length
    def getARow(x : Array[String], size : Int) : Row={
      val columnArray = new Array[String](size)
      for (i <- 0 to (size-1)) {
        columnArray(i)=x(i).toString()
      }
      Row.fromSeq(columnArray)
    }
    
    val fields = schemaString.split(" ")
      .map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)
    val rowRDD = peopleRDD
      .map(_.split(","))
      .map(attributes => getARow(attributes,fieldCount))
    val peopleDF = spark.createDataFrame(rowRDD, schema)
    

    【讨论】:

      【解决方案2】:

      我遇到了与您完全相同的问题,并尝试以相同的方式解决它,最初导致相同的错误:) 我对 scala 很陌生,但设法想出了一个生成该行对象的函数(只需要传递你的字段计数)。

      获取字段数:

      val fieldCount = rdd.map(_.split("\u0001")).map(x => x.size).first()
      

      生成行对象函数:

      def getARow(x : Array[String], size : Int) : Row={
      val columnArray = new Array[String](size+1)
      for (i <- 0 to (size)) {
        columnArray(i)=x(i).toString()
       }
      Row.fromSeq(columnArray)
      }
      

      使用您的 rdd 和架构创建数据框

      val myDF = sqlContext.createDataFrame(rdd.map(_.split(delim)).map { x => getARow(x,fieldCount) }, mySchema)
      

      希望这对某人有所帮助!

      【讨论】:

        猜你喜欢
        • 2021-12-21
        • 1970-01-01
        • 1970-01-01
        • 2022-01-03
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-08-11
        相关资源
        最近更新 更多