【发布时间】:2015-07-21 20:31:51
【问题描述】:
我有许多 CSV 文件,需要通过部分文件名将它们组合成一个 RDD。
例如,对于以下文件
$ ls
20140101_1.csv 20140101_3.csv 20140201_2.csv 20140301_1.csv
20140301_3.csv 20140101_2.csv 20140201_1.csv 20140201_3.csv
我需要将名称为 20140101*.csv 的文件组合成一个 RDD 来处理它等等。
我正在使用sc.wholeTextFiles 读取整个目录,然后按文件名的模式对文件名进行分组以形成文件名字符串。
然后我将字符串传递给 sc.textFile 以将文件作为单个 RDD 打开。
这是我的代码 -
val files = sc.wholeTextFiles("*.csv")
val indexed_files = files.map(a => (a._1.split("_")(0),a._1))
val data = indexed_files.groupByKey
data.map { a =>
var name = a._2.mkString(",")
(a._1, name)
}
data.foreach { a =>
var file = sc.textFile(a._2)
println(file.count)
}
当我尝试拨打textFile 时,我得到SparkException - NullPointerException。错误堆栈指的是 RDD 中的一个迭代器。我无法理解错误 -
15/07/21 15:37:37 INFO TaskSchedulerImpl: Removed TaskSet 65.0, whose tasks have all completed, from pool
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 65.0 failed 4 times, most recent failure: Lost task 1.3 in stage 65.0 (TID 115, 10.132.8.10): java.lang.NullPointerException
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:33)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:32)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:870)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:870)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
但是,当我在 spark shell 中执行 sc.textFile(data.first._2).count 时,我能够形成 RDD 并能够检索计数。
非常感谢任何帮助。
【问题讨论】:
-
" var file = sc.textFile(a._2)" 在另一个 rdd 映射的 foreach 中不起作用。你不能像这样嵌套 RDD。
-
谢谢@Paul...我更正了它,现在我可以创建 RDD。请回答问题,我会接受。
标签: scala apache-spark rdd