【问题标题】:ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy while using saveToCassandraClassCastException:在使用 saveToCassandra 时无法分配 scala.collection.immutable.List$SerializationProxy 的实例
【发布时间】:2017-09-12 09:11:25
【问题描述】:

我正在使用 sqlContext 的 cassandraTable() 函数从 cassandra 读取数据。它将创建一个 DataFrame。 我正在将此 Df 转换为 Rdd 并将其映射到案例类对象。 dataClass 是一个数据框。 我检查了发布的类似问题,但没有任何帮助。

val dataClass = cartData.rdd.map({case Row(session_id : String, time_stamp : Date, data : String) => cartDataClass(session_id, time_stamp, data)})

map 函数内部的匿名函数产生了问题。这是正确的吗 ?好像不能序列化函数。

dataClass 是一个 RDD[cartDataClass] 现在我正在尝试将此 RDD 保存到 cassandra。

dataClass.saveToCassandra("keyspace", "table")

但是它抛出了这个异常:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 26, 192.168.1.104): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

这看起来像是 RDD 序列化的一些问题。 RDD 是可序列化的,那么这里有什么问题呢? 我在 scala 对象主函数中编写脚本,是因为 spark 无法序列化 scala 对象吗? 请帮忙,我是 scala 和 spark 的新手。

【问题讨论】:

  • 我不认为这是一个序列化问题。但是如果没有看到cartDataClass 的定义,就很难弄清楚你的问题。
  • 它是一个案例类。 case class cartDataClass(session_id : String, time_stamp : Date, data : String){ def modifyData() : cartDataClass { // 一些代码来改变数据字段并返回 cartDataClass 对象 }

标签: scala apache-spark serialization cassandra rdd


【解决方案1】:

如果我可以建议的话。只需将 DataFrame 本身保存到 C*。数据帧“写”方法可以与 C* 一起使用,请参阅

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md#persisting-a-dataset-to-cassandra-using-the-save-command

如果不知道cartDataClass 是如何定义的,就很难知道您的依赖关系树中可能出了什么问题。我的猜测是,被序列化的 RDD 的依赖树在该类型上存在问题。

【讨论】:

    【解决方案2】:
    new SparkConf().setAppName("test").setMaster("local[2]").set("spark.executor.memory", "4g")
    

    local[2] 及其工作

    【讨论】:

    • 我可以请求您在您的答案周围添加更多上下文。仅代码的答案很难理解。如果您可以在帖子中添加更多信息,它将帮助提问者和未来的读者。另请参阅Explaining entirely code-based answers
    猜你喜欢
    • 1970-01-01
    • 2020-06-30
    • 1970-01-01
    • 1970-01-01
    • 2017-02-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多