【问题标题】:Check if file exists in HDFS path?检查文件是否存在于 HDFS 路径中?
【发布时间】:2020-06-02 02:06:20
【问题描述】:

在给定基本路径的情况下,如何检查文件是否存在。我正在为该方法提供一个文件列表,例如: file1.snappy, file2,snappy,...

我需要检查文件是否存在于任一给定路径中,例如:hdfs://a/b/c/source/file1.snappy 或文件是否存在于hdfs://a/b/c/target/file1.snappy。如何更新/修改以下方法以接受 /a/b/c/target//a/b/c/source/ 作为基本路径并检查文件是否存在?如果它存在于源中,则添加到源列表中,如果它在目标中,则添加到目标列表中。

  val fs = FileSystem.get(sprk.sparkContext.hadoopConfiguration)

  def fileExists(fileList:Array[String]) : Boolean = {
    var fileNotFound = 0
    fileList.foreach{
      file => {
        if(!fs.exists(new Path(file)))  fileNotFound+=1
        print("fileList",file)
      }
    }
    if(fileNotFound > 0) {
      println(fileNotFound + ": number of files not found probably moved")
      false
    }
    else
      true
  }

【问题讨论】:

  • 你的预期输出是什么?
  • 布尔值 true 或 false,取决于文件是否存在于源或目标中。我可以调用该方法两次,这不是问题。 @Srinivas
  • @coderWorld 根据我的经验,它完全符合您的 s3 要求。和测试用例

标签: scala hadoop


【解决方案1】:

更新代码以适用于 hdfss3

请检查以下代码。

scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}
import org.apache.hadoop.fs._

// For converting to scala Iterator
implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
    case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
      override def hasNext: Boolean = remoteIterator.hasNext
      override def next(): T = remoteIterator.next()
    }
    wrapper(remoteIterator)
}

import java.net.URI

def fs(path: String) = FileSystem.get(URI.create(path),spark.sparkContext.hadoopConfiguration)

// Exiting paste mode, now interpreting.

import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}
import org.apache.hadoop.fs._
convertToScalaIterator: [T](remoteIterator: org.apache.hadoop.fs.RemoteIterator[T])Iterator[T]
fs: org.apache.hadoop.fs.FileSystem = org.apache.hadoop.fs.LocalFileSystem@640517de

示例目录

scala> "tree /tmp/examples".!
/tmp/examples

0 directories, 0 files

scala> "tree /tmp/sample".!
/tmp/sample
├── aaa
│   └── sample.json
├── bbb
│   └── bbb.json
├── ccc
└── ddd

4 directories, 2 files

结果

scala> List("/tmp/sample","/tmp/examples")
.flatMap(dir => {
    fs(dir)
    .listFiles(new Path(dir),true)
    .toList
    .filter(_.isFile)
    .map(d => (d.getPath.getParent,d.getPath))
// If you want only Boolean values, May be change above line to ```.map(d => (d.getPath.getParent,d.isFile))``` 
})
.foreach(println)

(/tmp/sample/bbb,file:/tmp/sample/bbb/bbb.json)
(/tmp/sample/aaa,file:/tmp/sample/aaa/sample.json)


【讨论】:

  • 我认为这可能不适用于 s3 文件 Rams 答案可能有效
  • 谢谢,我已经更新了代码,现在它适用于hdfss3
  • 虽然我不喜欢implicits,但这是一个不错的解决方案,尤其是listFiles 对于S3 非常有效(异步预取下一页结果)。提示:如果您在 Hadoop 3.3.1 中的迭代器上调用 toString(),它还会为您提供有关 LIST 调用次数及其延迟的统计信息。
【解决方案2】:

我有一个源目录和目标如下例所示

尝试这种方式进行递归查找

URI.create(... ) 在处理 s3 对象时非常有用(也适用于 hdfs / local fs)

import java.net.URI

import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
/**
    * getAllFiles - get all files recursively from the sub folders.
    * 
    * @param path String
    * @param sc SparkContext
    * @return Seq[String]
    */
  def getAllFiles(path: String, sc: SparkContext): Seq[String] = {
    val conf = sc.hadoopConfiguration
    val fs = FileSystem.get(URI.create(path), conf)
    val files: RemoteIterator[LocatedFileStatus] = fs.listFiles(new Path(path), true) // true for recursive lookup
    val buf = new ArrayBuffer[String]
    while (files.hasNext()) {
      val fileStatus = files.next();
      buf.append(fileStatus.getPath().toString)
    }
    buf.toSeq
  }

用法示例:

 val spark: SparkSession = SparkSession.builder.appName(getClass.getName)
    .master("local[*]").getOrCreate


  val sc = spark.sparkContext

  val myfiles: Seq[String] = getAllFiles("data/test_table", sc)
  myfiles.foreach(println)
  println(myfiles.contains("/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy.parquet"))

结果:

/data/test_table/target/part-00000-9205704a-cb0c-4933-87d4-c21313e76297-c000.snappy.parquet
/data/test_table/target/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy1111.parquet
/data/test_table/target/part-00000-9205704a-cb0c-4933-87d4-c21313e76297-c000.snappy11.parquet
/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy1.parquet
/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy111.parquet
/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy.parquet


true

【讨论】:

  • 已添加导入
  • 我的方法也需要 List:Array[String] 作为参数。但你的只是一个字符串。
  • 稍微更新了问题,如果我需要将这些元素添加到 sourceList 或 destinationList 中,而不是只返回 True 或 False,这取决于路径是否包含源或目标?
  • 这些是您可以根据需要进行调整的微小/次要的事情。但抓住如何做到这一点的主要概念。希望你理解主题
  • 好。通用解决方案..它适用于 HDFS 和 S3 .. 我的仅适用于 HDFS.. :)
猜你喜欢
  • 2011-07-01
  • 2023-03-17
  • 2011-12-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多