【问题标题】:Rename and Move S3 files based on their folders name in spark scala根据 spark scala 中的文件夹名称重命名和移动 S3 文件
【发布时间】:2018-06-25 03:21:31
【问题描述】:

我在 s3 文件夹中有 spark 输出,我想将所有 s3 文件从该输出文件夹移动到另一个位置,但在移动时我想重命名文件。

例如,我在 S3 文件夹中有文件,如下所示

现在我想重命名所有文件并放入另一个目录,但文件的名称如下所示

Fundamental.FinancialStatement.FinancialStatementLineItems.Japan.1971-BAL.1.2017-10-18-0439.Full.txt
Fundamental.FinancialStatement.FinancialStatementLineItems.Japan.1971-BAL.2.2017-10-18-0439.Full.txt
Fundamental.FinancialStatement.FinancialStatementLineItems.Japan.1971-BAL.3.2017-10-18-0439.Full.txt

这里 Fundamental.FinancialStatement 是所有文件中的常量 2017-10-18-0439 当前日期时间。

这是我迄今为止尝试过的,但无法获取文件夹名称并循环遍历所有文件

    import org.apache.hadoop.fs._

val src = new Path("s3://trfsmallfffile/Segments/output")
val dest = new Path("s3://trfsmallfffile/Segments/Finaloutput")
val conf = sc.hadoopConfiguration   // assuming sc = spark context
val fs = src.getFileSystem(conf)
//val file = fs.globStatus(new Path("src/DataPartition=Japan/part*.gz"))(0).getPath.getName
//println(file)
val status = fs.listStatus(src)    

status.foreach(filename => {
               val a = filename.getPath.getName.toString()
                println("file name"+a)
                //println(filename)
             })

这给了我下面的输出

    file nameDataPartition=Japan
file nameDataPartition=SelfSourcedPrivate
file nameDataPartition=SelfSourcedPublic
file name_SUCCESS

这给了我文件夹的详细信息,而不是文件夹内的文件。

参考取自这里Stack Overflow Refrence

【问题讨论】:

  • 您是否尝试使用fs.listFiles 然后map 重命名每个文件?
  • @philantrovert 是的,但是提取文件夹名称然后移动我做不到..你能放一些示例代码吗
  • 我不明白。一旦您执行fs.rename(src, dest)dest 就不会成为所有需要重命名的文件所在的文件夹名称。抱歉,如果我遗漏了什么。我在代理后面,看不到图像。
  • @philantrovert 我已经更新了mu代码,请看一下
  • Arthav:在堆栈溢出时调试单独的代码行不是一种可行的开发实践。我会推荐一个像 IntelliJ IDEA 这样的 IDE,用于测试的 scalatest,并设置断点。您的代码已损坏,这是逐步调试会告诉您的。

标签: scala apache-spark amazon-s3


【解决方案1】:

您正在获取目录,因为您在 s3 中有子目录级别。

/*/* to go in subdir .

试试这个

    import org.apache.hadoop.fs._

val src = new Path("s3://trfsmallfffile/Segments/Output/*/*")
val dest = new Path("s3://trfsmallfffile/Segments/FinalOutput")
val conf = sc.hadoopConfiguration   // assuming sc = spark context
val fs = src.getFileSystem(conf)

val file = fs.globStatus(new Path("s3://trfsmallfffile/Segments/Output/*/*"))


  for (urlStatus <- file) {
    //println("S3 FILE PATH IS ===:" + urlStatus.getPath)
    val partitioName=urlStatus.getPath.toString.split("=")(1).split("\\/")(0).toString
    val finalPrefix="Fundamental.FinancialLineItem.Segments."
    val finalFileName=finalPrefix+partitioName+".txt"
    val dest = new Path("s3://trfsmallfffile/Segments/FinalOutput"+"/"+finalFileName+ " ")
    fs.rename(urlStatus.getPath, dest)
  }

【讨论】:

    【解决方案2】:

    这在过去对我有用

    import org.apache.hadoop.fs.{FileSystem, Path}
    import org.apache.hadoop.conf.Configuration 
    val path = "s3://<bucket>/<directory>"
    val fs = FileSystem.get(new java.net.URI(path), spark.sparkContext.hadoopConfiguration)
    fs.listStatus(new Path(path))
    

    列表状态提供s3目录下的所有文件

    【讨论】:

    • 不,它不给我文件它只列出文件夹详细信息
    猜你喜欢
    • 1970-01-01
    • 2016-03-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-08-10
    • 1970-01-01
    • 2016-09-11
    相关资源
    最近更新 更多