【问题标题】:Spark Scala list folders in directorySpark Scala 列出目录中的文件夹
【发布时间】:2016-01-28 10:51:56
【问题描述】:

我想使用 Scala/Spark 列出 hdfs 目录中的所有文件夹。 在 Hadoop 中,我可以使用以下命令来做到这一点:hadoop fs -ls hdfs://sandbox.hortonworks.com/demo/

我试过了:

val conf = new Configuration()
val fs = FileSystem.get(new URI("hdfs://sandbox.hortonworks.com/"), conf)

val path = new Path("hdfs://sandbox.hortonworks.com/demo/")

val files = fs.listFiles(path, false)

但他似乎没有在 Hadoop 目录中查找,因为我找不到我的文件夹/文件。

我也试过了:

FileSystem.get(sc.hadoopConfiguration).listFiles(new Path("hdfs://sandbox.hortonworks.com/demo/"), true)

但这也无济于事。

你还有什么想法吗?

PS:我也检查了这个线程:Spark iterate HDFS directory 但它对我不起作用,因为它似乎没有在 hdfs 目录上搜索,而是只在具有架构文件的本地文件系统上搜索//。

【问题讨论】:

  • 这个解决方案帮助我解决了一个错误。我需要编写val fs = FileSystem.get(new URI("s3://mybucket/mykey"), conf) 之类的代码来获取正确的文件系统供 spark 使用。默认文件系统用于 hdfs。

标签: scala hadoop apache-spark


【解决方案1】:

我们使用的是 hadoop 1.4,它没有 listFiles 方法,所以我们使用 listStatus 来获取目录。它没有递归选项,但很容易管理递归查找。

val fs = FileSystem.get(new Configuration())
val status = fs.listStatus(new Path(YOUR_HDFS_PATH))
status.foreach(x=> println(x.getPath))

【讨论】:

  • 非常感谢,listStatus 可以更好地获取文件夹并且运行良好!就我而言,我不需要递归查找,所以这很好。 一个补充:当我使用您的编码时,文件系统架构是 file:// 并且我不能使用 hdfs:// 作为架构。所以我以这种方式创建了文件系统:val conf = new Configuration() val fs = FileSystem.get(new URI("hdfs://sandbox.hortonworks.com/"), conf)。然后文件系统接受 hdfs:// 路径。
  • “错误:未找到:类型配置”,如何导入或准备?使用import org.apache.hadoop.conf.Configuration
【解决方案2】:

在 Spark 2.0+ 中,

import org.apache.hadoop.fs.{FileSystem, Path}
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
fs.listStatus(new Path(s"${hdfs-path}")).filter(_.isDir).map(_.getPath).foreach(println)

希望这有帮助。

【讨论】:

    【解决方案3】:

    在 Ajay Ahujas 中回答 isDir 已弃用..

    使用isDirectory...请参阅下面的完整示例和输出。

    package examples
    
        import org.apache.log4j.Level
        import org.apache.spark.sql.SparkSession
    
        object ListHDFSDirectories  extends  App{
          val logger = org.apache.log4j.Logger.getLogger("org")
          logger.setLevel(Level.WARN)
          val spark = SparkSession.builder()
            .appName(this.getClass.getName)
            .config("spark.master", "local[*]").getOrCreate()
    
          val hdfspath = "." // your path here
          import org.apache.hadoop.fs.{FileSystem, Path}
          val fs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
          fs.listStatus(new Path(s"${hdfspath}")).filter(_.isDirectory).map(_.getPath).foreach(println)
        }
    

    结果:

    file:/Users/user/codebase/myproject/target
    file:/Users/user/codebase/myproject/Rel
    file:/Users/user/codebase/myproject/spark-warehouse
    file:/Users/user/codebase/myproject/metastore_db
    file:/Users/user/codebase/myproject/.idea
    file:/Users/user/codebase/myproject/src
    

    【讨论】:

      【解决方案4】:

      我一直在寻找相同的,但不是 HDFS,而是用于 S3

      我解决了使用 S3 路径创建文件系统的问题,如下所示:

        def getSubFolders(path: String)(implicit sparkContext: SparkContext): Seq[String] = {
          val hadoopConf = sparkContext.hadoopConfiguration
          val uri = new URI(path)
      
          FileSystem.get(uri, hadoopConf).listStatus(new Path(path)).map {
            _.getPath.toString
          }
        }
      

      我知道这个问题与 HDFS 有关,但也许像我这样的其他人会来这里寻找 S3 解决方案。由于没有在 FileSystem 中指定 URI,它会查找 HDFS 的。

      java.lang.IllegalArgumentException: Wrong FS: s3://<bucket>/dummy_path
      expected: hdfs://<ip-machine>.eu-west-1.compute.internal:8020
      

      【讨论】:

      • 你能举个例子吗?
      【解决方案5】:
         val listStatus = org.apache.hadoop.fs.FileSystem.get(new URI(url), sc.hadoopConfiguration)
      .globStatus(new org.apache.hadoop.fs.Path(url))
      
        for (urlStatus <- listStatus) {
          println("urlStatus get Path:" + urlStatus.getPath())
      

      }

      【讨论】:

        【解决方案6】:
        val spark = SparkSession.builder().appName("Demo").getOrCreate()
        val path = new Path("enter your directory path")
        val fs:FileSystem = projects.getFileSystem(spark.sparkContext.hadoopConfiguration)
        val it = fs.listLocatedStatus(path)
        

        这将在 org.apache.hadoop.fs.LocatedFileStatus 上创建一个迭代器 it,这是您的子目录

        【讨论】:

          【解决方案7】:

          Azure 博客存储映射到一个 HDFS 位置,因此所有 Hadoop 操作

          Azure Portal,进入Storage Account,您会看到以下详细信息:

          • 存储帐户

          • 键 -

          • 容器 -

          • 路径模式 – /users/accountsdata/

          • 日期格式 - yyyy-mm-dd

          • 事件序列化格式——json

          • 格式 - 行分隔

          Path Pattern 这里是HDFS的路径,你可以登录/putty到Hadoop边缘节点然后做:

          hadoop fs -ls /users/accountsdata 
          

          以上命令将列出所有文件。在 Scala 中,您可以使用

          import scala.sys.process._ 
          
          val lsResult = Seq("hadoop","fs","-ls","/users/accountsdata/").!!
          

          【讨论】:

          • 当 Spark History Server 出现内存问题时,它帮助我获取 Jupyter notebook 中的日志大小。
          【解决方案8】:
          object HDFSProgram extends App {    
            val uri = new URI("hdfs://HOSTNAME:PORT")    
            val fs = FileSystem.get(uri,new Configuration())    
            val filePath = new Path("/user/hive/")    
            val status = fs.listStatus(filePath)    
            status.map(sts => sts.getPath).foreach(println)    
          }
          

          这是获取 /user/hive/ 下存在的 hdfs 文件或文件夹列表的示例代码

          【讨论】:

            【解决方案9】:

            由于您使用的是 Scala,您可能还对以下内容感兴趣:

            import scala.sys.process._
            val lsResult = Seq("hadoop","fs","-ls","hdfs://sandbox.hortonworks.com/demo/").!!
            

            不幸的是,这会将命令的整个输出作为字符串返回,因此仅解析为文件名需要一些努力。 (请改用fs.listStatus。)但是,如果您发现自己需要运行其他可以在命令行中轻松执行的命令,并且不确定如何在Scala 中执行此操作,只需通过scala.sys.process._ 使用命令行即​​可。 (如果您只想获取返回码,请使用单个 !。)

            【讨论】:

              猜你喜欢
              • 2021-03-21
              • 1970-01-01
              • 2011-06-29
              • 2013-12-25
              • 1970-01-01
              • 2020-12-22
              • 1970-01-01
              • 2014-01-06
              • 2016-11-11
              相关资源
              最近更新 更多