【问题标题】:Function returns an empty List in Spark函数在 Spark 中返回一个空列表
【发布时间】:2016-03-14 17:17:50
【问题描述】:

以下是获取压缩文件中文件名列表的代码

def getListOfFilesInRepo(zipFileRDD : RDD[(String,PortableDataStream)]) : (List[String]) = {
    val zipInputStream = zipFileRDD.values.map(x => new ZipInputStream(x.open))
    val filesInZip =  new ArrayBuffer[String]()
    var ze : Option[ZipEntry] = None
    zipInputStream.foreach(stream =>{
      do{
        ze = Option(stream.getNextEntry);
        ze.foreach{ze =>
          if(ze.getName.endsWith("java") && !ze.isDirectory()){
            var fileName:String = ze.getName.substring(ze.getName.lastIndexOf("/")+1,ze.getName.indexOf(".java"))
            filesInZip += fileName
          }
        }
        stream.closeEntry()
      } while(ze.isDefined)
      println(filesInZip.toList.length) // print 889 (correct)
    })
    println(filesInZip.toList.length) // print 0 (WHY..?)
    (filesInZip.toList)
  }

我按照以下方式执行上面的代码:

scala> val zipFileRDD = sc.binaryFiles("./handsOn/repo~apache~storm~14135470~false~Java~master~2210.zip")
zipFileRDD: org.apache.spark.rdd.RDD[(String, org.apache.spark.input.PortableDataStream)] = ./handsOn/repo~apache~storm~14135470~false~Java~master~2210.zip BinaryFileRDD[17] at binaryFiles at <console>:25

scala> getListOfFilesInRepo(zipRDD)
889
0
res12: List[String] = List()

为什么我没有得到 889 而得到 0?

【问题讨论】:

    标签: scala apache-spark functional-programming scala-collections


    【解决方案1】:

    这是因为 filesInZip 没有在工作人员之间共享。 foreachfilesInZip 的本地副本进行操作,当它完成时,该副本被简单地丢弃并被垃圾收集。如果您想保留结果,您应该使用转换(很可能是 flatMap)并返回收集的聚合值。

    def listFiles(stream: PortableDataStream): TraversableOnce[String] = ???
    
    zipInputStream.flatMap(listFiles)
    

    您可以通过Understanding closures 了解更多信息

    【讨论】:

      猜你喜欢
      • 2021-12-22
      • 2021-05-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-02-03
      • 1970-01-01
      • 1970-01-01
      • 2015-08-13
      相关资源
      最近更新 更多