【发布时间】:2016-10-28 21:26:26
【问题描述】:
我有一些中间数据需要存储在 HDFS 和本地。我正在使用 Spark 1.6。在作为中间形式的 HDFS 中,我在/output/testDummy/part-00000 和/output/testDummy/part-00001 中获取数据。我想使用 Java/Scala 将这些分区保存在本地,以便我可以将它们分别保存为/users/home/indexes/index.nt(通过在本地合并)或/users/home/indexes/index-0000.nt 和/home/indexes/index-0001.nt。
这是我的代码:
注意:testDummy 和 test 一样,输出是两个分区。我想将它们单独存储或与index.nt 文件组合但在本地存储。我更喜欢分别存储在两个数据节点中。我正在使用集群并在 YARN 上提交火花作业。我还添加了一些 cmets,我得到了多少次和什么数据。我该怎么办?任何帮助表示赞赏。
val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS+"/testDummy")
println("testDummy done") //1 time print
def savesData(iterator: Iterator[(String)]): Iterator[(String)] = {
println("Inside savesData") // now 4 times when coalesce(Constants.INITIAL_PARTITIONS)=2
println("iter size"+iterator.size) // 2 735 2 735 values
val filenamesWithExtension = outputPath + "/index.nt"
println("filenamesWithExtension "+filenamesWithExtension.length) //4 times
var list = List[(String)]()
val fileWritter = new FileWriter(filenamesWithExtension,true)
val bufferWritter = new BufferedWriter(fileWritter)
while (iterator.hasNext){ //iterator.hasNext is false
println("inside iterator") //0 times
val dat = iterator.next()
println("datadata "+iterator.next())
bufferWritter.write(dat + "\n")
bufferWritter.flush()
println("index files written")
val dataElements = dat.split(" ")
println("dataElements") //0
list = list.::(dataElements(0))
list = list.::(dataElements(1))
list = list.::(dataElements(2))
}
bufferWritter.close() //closing
println("savesData method end") //4 times when coal=2
list.iterator
}
println("before saving data into local") //1
val test = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).mapPartitions(savesData)
println("testRDD partitions "+test.getNumPartitions) //2
println("testRDD size "+test.collect().length) //0
println("after saving data into local") //1
PS:我关注了this 和this,但与我正在寻找的不完全相同,我以某种方式做了,但在index.nt 中没有得到任何东西
【问题讨论】:
-
Scala 通过使
list.::(dataElements(2))等同于dataElements(2) :: list让世界变得更好了一点,所以不要让Scala 伤心,至少在方法类似于运算符时开始使用这种语法。顺便说一句,ListBuffer在这里可能更合适
标签: java scala hadoop apache-spark hdfs