【问题标题】:How to create Spark DataFrame from RDD[Row] when Row contains complex types当 Row 包含复杂类型时,如何从 RDD[Row] 创建 Spark DataFrame
【发布时间】:2019-08-07 22:58:05
【问题描述】:

我有一个RDD[HbaseRecord],其中包含一个自定义复杂类型Name。这两个类的定义如下:

class HbaseRecord(
      val uuid: String,
      val timestamp: String,
      val name: Name
)

class Name(    
    val firstName:                String,     
    val middleName:               String,       
    val lastName:                 String
)

在我的代码中的某个时刻,我想从那个 RDD 生成一个 DataFrame,所以我可以将它保存为一个 avro 文件。我尝试了以下方法:

//I get an Object from Hbase here
val objectRDD : RDD[HbaseRecord] = ... 

//I convert the RDD[HbaseRecord] into RDD[Row]
val rowRDD : RDD[Row] = objectRDD .map(
    hbaseRecord => {
      val uuid : String = hbaseRecord.uuid
      val timestamp : String = hbaseRecord.timestamp
      val name : Name = hbaseRecord.name

      Row(uuid, timestamp, name)
    })

//Here I define the schema
   val schema = new StructType()
                  .add("uuid",StringType)
                  .add("timestamp", StringType)
                  .add("name", new StructType()
                                  .add("firstName",StringType)
                                  .add("middleName",StringType)
                                  .add("lastName",StringType)

//Now I try to create a Dataframe using the RDD[Row] and the schema
val dataFrame = sqlContext.createDataFrame(rowRDD , schema)

但我收到以下错误:

scala.MatchError: (of class java.lang.String) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401) 在 org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) 在 org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Ite​​rator.scala:727) 在 scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1157) 在 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 在 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:27​​3) 在 scala.collection.AbstractIterator.to(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 在 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 在 scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 在 org.apache.spark.scheduler.Task.run(Task.scala:89) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745)

我尝试从 Row 中删除复杂类型,所以它将是 Row[String, String],然后没有错误。所以我认为问题出在复杂类型上。

我做错了什么?或者我可以采用什么其他方法来生成具有复杂类型的 DataFrame?

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    我只是为此使用了简单的case class 而不是类。 name 列不符合定义的架构。 将 name 列转换为 Row 类型,它应该可以工作。

    val rowRDD : RDD[Row] = objectRDD .map(
        hbaseRecord => {
          val uuid : String = hbaseRecord.uuid
          val timestamp : String = hbaseRecord.timestamp
          val name = Row(hbaseRecord.name.firstName,
                         hbaseRecord.name.middleName,hbaseRecord.name.lastName)
          Row(uuid, timestamp, name)
        })
    

    【讨论】:

    • 谢谢,我会试试的。关于生成数据框的架构,我应该改变什么吗??
    • 不,但这取决于您的需要。检查输出,然后你可以决定。
    • 谢谢!有效。我也想问你,如果是一组地图,会怎么样?我猜 Row 会继续代表名称映射,但我怎么能指定有一个名称数组?
    • 请发布一个不同的问题和您的期望(数据、架构和输出)。
    猜你喜欢
    • 2019-08-09
    • 2017-06-13
    • 1970-01-01
    • 2016-08-28
    • 1970-01-01
    • 2022-09-27
    • 2019-02-17
    • 1970-01-01
    • 2020-08-24
    相关资源
    最近更新 更多