【发布时间】:2019-08-07 22:58:05
【问题描述】:
我有一个RDD[HbaseRecord],其中包含一个自定义复杂类型Name。这两个类的定义如下:
class HbaseRecord(
val uuid: String,
val timestamp: String,
val name: Name
)
class Name(
val firstName: String,
val middleName: String,
val lastName: String
)
在我的代码中的某个时刻,我想从那个 RDD 生成一个 DataFrame,所以我可以将它保存为一个 avro 文件。我尝试了以下方法:
//I get an Object from Hbase here
val objectRDD : RDD[HbaseRecord] = ...
//I convert the RDD[HbaseRecord] into RDD[Row]
val rowRDD : RDD[Row] = objectRDD .map(
hbaseRecord => {
val uuid : String = hbaseRecord.uuid
val timestamp : String = hbaseRecord.timestamp
val name : Name = hbaseRecord.name
Row(uuid, timestamp, name)
})
//Here I define the schema
val schema = new StructType()
.add("uuid",StringType)
.add("timestamp", StringType)
.add("name", new StructType()
.add("firstName",StringType)
.add("middleName",StringType)
.add("lastName",StringType)
//Now I try to create a Dataframe using the RDD[Row] and the schema
val dataFrame = sqlContext.createDataFrame(rowRDD , schema)
但我收到以下错误:
scala.MatchError: (of class java.lang.String) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401) 在 org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) 在 org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) 在 scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 在 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 在 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 在 scala.collection.AbstractIterator.to(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 在 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 在 scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 在 org.apache.spark.scheduler.Task.run(Task.scala:89) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745)
我尝试从 Row 中删除复杂类型,所以它将是 Row[String, String],然后没有错误。所以我认为问题出在复杂类型上。
我做错了什么?或者我可以采用什么其他方法来生成具有复杂类型的 DataFrame?
【问题讨论】:
标签: scala apache-spark