【问题标题】:How can one list all csv files in an HDFS location within the Spark Scala shell?如何在 Spark Scala shell 中列出 HDFS 位置中的所有 csv 文件?
【发布时间】:2015-12-22 15:09:22
【问题描述】:

这样做的目的是为了在 HDFS 的第二个位置操作和保存每个数据文件的副本。我将使用

RddName.coalesce(1).saveAsTextFile(pathName)

将结果保存到 HDFS。

这就是为什么我想单独处理每个文件,即使我确信性能不会那么高效。但是,我还没有确定如何将 CSV 文件路径列表存储到字符串数组中,然后使用单独的 RDD 循环遍历每个字符串。

让我们使用以下匿名示例作为 HDFS 源位置:

/data/email/click/date=2015-01-01/sent_20150101.csv
/data/email/click/date=2015-01-02/sent_20150102.csv
/data/email/click/date=2015-01-03/sent_20150103.csv

我知道如何使用 Hadoop FS Shell 列出文件路径:

HDFS DFS -ls /data/email/click/*/*.csv

我知道如何为所有数据创建一个 RDD:

val sentRdd = sc.textFile( "/data/email/click/*/*.csv" )

【问题讨论】:

    标签: scala hadoop apache-spark hdfs


    【解决方案1】:

    我还没有彻底测试过,但这样的事情似乎有效:

    import org.apache.spark.deploy.SparkHadoopUtil
    import org.apache.hadoop.fs.{FileSystem, Path, LocatedFileStatus, RemoteIterator}
    import java.net.URI
    
    val path: String = ???
    
    val hconf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
    val hdfs = FileSystem.get(hconf)
    val iter = hdfs.listFiles(new Path(path), false)
    
    def listFiles(iter: RemoteIterator[LocatedFileStatus]) = {
      def go(iter: RemoteIterator[LocatedFileStatus], acc: List[URI]): List[URI] = {
        if (iter.hasNext) {
          val uri = iter.next.getPath.toUri
          go(iter, uri :: acc)
        } else {
          acc
        }
      }
      go(iter, List.empty[java.net.URI])
    }
    
    listFiles(iter).filter(_.toString.endsWith(".csv"))
    

    【讨论】:

    • 有什么理由必须使用 URI?我可以只使用Path吗,返回结果是List[Path]
    • @MinnieShi 看不出有什么理由你不能
    • 尾递归可以用while循环代替
    【解决方案2】:

    这最终对我有用:

    import org.apache.hadoop.fs._
    import org.apache.spark.deploy.SparkHadoopUtil
    import java.net.URI
    
    val hdfs_conf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
    val hdfs = FileSystem.get(hdfs_conf)
    // source data in HDFS
    val sourcePath = new Path("/<source_location>/<filename_pattern>")
    
    hdfs.globStatus( sourcePath ).foreach{ fileStatus =>
       val filePathName = fileStatus.getPath().toString()
       val fileName = fileStatus.getPath().getName()
    
       // < DO STUFF HERE>
    
    } // end foreach loop
    

    【讨论】:

      【解决方案3】:

      sc.wholeTextFiles(path) 应该会有所帮助。它给出(文件路径,文件内容)的rdd。

      【讨论】:

      • 那不是在使用数据吗?我只想遍历其中的每个文件路径。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2014-06-22
      • 2023-04-04
      • 1970-01-01
      • 1970-01-01
      • 2021-07-29
      • 1970-01-01
      • 2021-01-31
      相关资源
      最近更新 更多