【问题标题】:Save a spark RDD using mapPartition with iterator使用带有迭代器的 mapPartition 保存 spark RDD
【发布时间】:2016-10-28 21:26:26
【问题描述】:

我有一些中间数据需要存储在 HDFS 和本地。我正在使用 Spark 1.6。在作为中间形式的 HDFS 中,我在/output/testDummy/part-00000/output/testDummy/part-00001 中获取数据。我想使用 Java/Scala 将这些分区保存在本地,以便我可以将它们分别保存为/users/home/indexes/index.nt(通过在本地合并)或/users/home/indexes/index-0000.nt/home/indexes/index-0001.nt

这是我的代码: 注意:testDummy 和 test 一样,输出是两个分区。我想将它们单独存储或与index.nt 文件组合但在本地存储。我更喜欢分别存储在两个数据节点中。我正在使用集群并在 YARN 上提交火花作业。我还添加了一些 cmets,我得到了多少次和什么数据。我该怎么办?任何帮助表示赞赏。

 val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS+"/testDummy")
 println("testDummy done")   //1 time print

def savesData(iterator: Iterator[(String)]): Iterator[(String)] = {
    println("Inside savesData")                                 //  now 4 times when coalesce(Constants.INITIAL_PARTITIONS)=2
    println("iter size"+iterator.size)                           //  2 735 2 735 values
    val filenamesWithExtension = outputPath + "/index.nt"
    println("filenamesWithExtension "+filenamesWithExtension.length)   //4 times
    var list = List[(String)]()

    val fileWritter = new FileWriter(filenamesWithExtension,true)
    val bufferWritter = new BufferedWriter(fileWritter)

     while (iterator.hasNext){                       //iterator.hasNext is false
       println("inside iterator")                    //0 times 
       val dat = iterator.next()
       println("datadata "+iterator.next())

       bufferWritter.write(dat + "\n")
       bufferWritter.flush()
       println("index files written")

       val dataElements = dat.split(" ")
       println("dataElements")                                    //0
       list = list.::(dataElements(0))
       list = list.::(dataElements(1))
       list = list.::(dataElements(2))
     }
    bufferWritter.close() //closing
    println("savesData method end")                         //4 times when coal=2
    list.iterator
}

println("before saving data into local")                              //1
val test = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).mapPartitions(savesData)
println("testRDD partitions "+test.getNumPartitions)                               //2
println("testRDD size "+test.collect().length)                                //0
println("after saving data into local")   //1

PS:我关注了thisthis,但与我正在寻找的不完全相同,我以某种方式做了,但在index.nt 中没有得到任何东西

【问题讨论】:

  • Scala 通过使list.::(dataElements(2)) 等同于dataElements(2) :: list 让世界变得更好了一点,所以不要让Scala 伤心,至少在方法类似于运算符时开始使用这种语法。顺便说一句,ListBuffer 在这里可能更合适

标签: java scala hadoop apache-spark hdfs


【解决方案1】:

有几点:

  • 如果您打算以后使用数据,切勿致电Iterator.sizeIteratorsTraversableOnce。计算Iterator 大小的唯一方法是遍历它的所有元素,然后就没有更多数据要读取了。
  • 不要使用像mapPartitions 这样的转换来产生副作用。如果您想执行某种类型的 IO,请使用 foreach / foreachPartition 之类的操作。这是一种不好的做法,并且不能保证给定的代码只会执行一次。
  • 动作或转换中的本地路径是特定工作人员的本地路径。如果您想直接在客户端计算机上写入,您应该首先使用collecttoLocalIterator 获取数据。稍后写入分布式存储并获取数据可能会更好。

【讨论】:

    【解决方案2】:

    Java 7 提供了监视目录的方法。

    https://docs.oracle.com/javase/tutorial/essential/io/notification.html

    想法是创建一个监视服务,将其注册到感兴趣的目录(提及您感兴趣的事件,例如文件创建,删除等),监视,您将收到任何事件的通知,例如创建,删除等,您可以采取任何您想要的操作。

    在适用的情况下,您将不得不严重依赖 Java hdfs api。

    在后台运行程序,因为它永远等待事件。 (你可以写逻辑在你做任何你想做的事后退出)

    另一方面,shell 脚本也会有所帮助。

    读取文件时注意hdfs文件系统的一致性模型。

    希望这有助于一些想法。

    【讨论】:

      猜你喜欢
      • 2016-10-22
      • 2014-09-01
      • 2015-09-13
      • 2015-06-14
      • 1970-01-01
      • 1970-01-01
      • 2017-06-15
      • 2017-06-30
      • 2016-10-19
      相关资源
      最近更新 更多