【发布时间】:2017-09-12 09:11:25
【问题描述】:
我正在使用 sqlContext 的 cassandraTable() 函数从 cassandra 读取数据。它将创建一个 DataFrame。 我正在将此 Df 转换为 Rdd 并将其映射到案例类对象。 dataClass 是一个数据框。 我检查了发布的类似问题,但没有任何帮助。
val dataClass = cartData.rdd.map({case Row(session_id : String, time_stamp : Date, data : String) => cartDataClass(session_id, time_stamp, data)})
map 函数内部的匿名函数产生了问题。这是正确的吗 ?好像不能序列化函数。
dataClass 是一个 RDD[cartDataClass] 现在我正在尝试将此 RDD 保存到 cassandra。
dataClass.saveToCassandra("keyspace", "table")
但是它抛出了这个异常:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 26, 192.168.1.104): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
这看起来像是 RDD 序列化的一些问题。 RDD 是可序列化的,那么这里有什么问题呢? 我在 scala 对象主函数中编写脚本,是因为 spark 无法序列化 scala 对象吗? 请帮忙,我是 scala 和 spark 的新手。
【问题讨论】:
-
我不认为这是一个序列化问题。但是如果没有看到
cartDataClass的定义,就很难弄清楚你的问题。 -
它是一个案例类。 case class cartDataClass(session_id : String, time_stamp : Date, data : String){ def modifyData() : cartDataClass { // 一些代码来改变数据字段并返回 cartDataClass 对象 }
标签: scala apache-spark serialization cassandra rdd