【发布时间】:2015-04-30 11:48:17
【问题描述】:
我正在尝试使用 Spark 读取 CVS 文件,然后将其保存到 Cassandra。当我使用琐碎的值时,保存到 Cassandra 是有效的。
我有一个包含以下值的文件:
id,name,tag1|tag2|tag3
我想将它存储在 cassandra 表中:
id bigint, name varchar, tags set
我为此定义了一个案例类:
case class Item(id:Integer,name:String,tag:Set[String])
然后我使用这个表达式从 CVS 文件中取出 RDD
val items = sc.textFile("items.csv").map(l => l.split(",") match {case Array (a,b,c) => Item(Integer.parseInt(a),b,c.split("\\|").toSet)})
当我现在对项目(开始处理)调用 collect 或 saveToCassandra 时,我收到以下错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 29.0 failed 1 times, most recent failure: Lost task 1.0 in stage 29.0 (TID 38, localhost): scala.MatchError: [Ljava.lang.String;@6030bbe6 (of class [Ljava.lang.String;)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:33)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:33)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
【问题讨论】:
-
什么是 case class Item(id:Integer,name:String,tag[String]) ?标签是 Set[String] 吗?
-
是的,它是一个 Set[String]。我在上面改了
标签: scala csv apache-spark