【问题标题】:Scala - How to merge incremental files of HDFS locationScala - 如何合并 HDFS 位置的增量文件
【发布时间】:2021-01-31 07:49:06
【问题描述】:

我的要求是我有多个 HDFS 位置,每小时从 Kafka 摄取文件。因此,对于每个目录,如何将特定时间戳的所有文件合并到当前时间戳作为单个拼花文件,下一次将文件从最后合并的时间戳合并到当前时间戳,并在未来重复相同的操作。这就是我在 Spark Scala 工作中要做的所有事情,所以不能使用普通的 shell 脚本。 任何建议表示赞赏。

【问题讨论】:

  • 显示一个有点难以理解的例子
  • 看看三角洲湖
  • 您需要跨目录合并文件还是只能在目录内进行合并?您希望多久运行一次合并?我
  • 嗨@Sugesh,我想从任何特定目录将一天的所有hdfs文件(根据文件的unix时间戳)合并为单个文件,这意味着如果1000个文件可用1个月,那么总共 30/31 个文件应该作为输出合并文件,每天一个文件。请建议方法。
  • 但我想使用 Spark Scala 而不是 Shell/Unix 脚本。

标签: scala shell file apache-spark hdfs


【解决方案1】:

这是一个有助于完成任务的代码 sn-p。

第一步是获取每个日期的文件列表作为地图。 (Map[String, List[String]]) 其中键是日期,值是具有相同日期的文件列表。日期取自 HDFS 文件的修改时间戳。

注意:使用本地路径测试代码,根据需要给出正确的HDFS路径/url。

在编写输出时,没有直接选项来指定目标文件名,但您可以指定每个日期特定的目标目录。代码让我们使用 FileSystem API 将文件重命名为所需的文件并删除每个日期创建的临时输出文件夹。

import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession

import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.spark.SparkContext
import org.joda.time.format.DateTimeFormat


object MergeFiles {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("Merging files day wise in a directory")
      .master("local[2]")
      .getOrCreate()

    val inputDir = "/Users/sujesh/test_data"
    val outputDir = "/Users/sujesh/output_data"

    val hadoopConf = spark.sparkContext.hadoopConfiguration
    val fs = FileSystem.get(hadoopConf)

    val filesPerDate = getFiles(inputDir, fs)

    filesPerDate
      .foreach { m =>
        spark
          .read
          .format("csv")
          .option("inferSchema", false)
          .option("header", false)
          .load(m._2:_*)
          .repartition(1)
          .write
          .format("csv")
          .save(s"$outputDir/${m._1}")

        val file = fs.globStatus(new Path(s"$outputDir/${m._1}/part*.csv"))(0).getPath.getName
        fs.rename(new Path(s"$outputDir/${m._1}/$file"), new Path(s"$outputDir/${m._1}.csv"))
        fs.delete(new Path(s"$outputDir/${m._1}"), true)
      }
  }

  /*
    Get the list of files group by date
    date is taken from file's modification timestamp
   */
  def getFiles(dir: String, fs: FileSystem) = {
    fs
      .globStatus(new Path(s"$dir/*.csv"))
      .map { f: FileStatus =>
        (DateTimeFormat.forPattern("yyyyMMdd").print(f.getModificationTime), f.getPath.toUri.getRawPath)
       }.groupBy(_._1)
       .map { case (k,v) => (k -> v.map(_._2).toSeq) }
  }
}

您可以在测试后进一步优化代码并将文件重命名代码转换为实用程序(如果必须重新使用)。已将 inferSchemaheader 等所有选项设置为 false。根据需要使用它们。这种方法也适用于其他格式的文件。

注意:如果您在同一目录中重复执行此过程,则需要进一步调整,因为新创建的文件将具有最新的时间戳。因此,如果这不是每天运行,您也需要显式更新文件的修改时间戳或忽略具有文件名模式的文件,例如yyyyMMdd.csv

【讨论】:

  • 嗨@Sujesh,我正在尝试连接我的hdfs(dev env。)集群并根据您给定的代码获取带有处理时间戳的hdfs文件,但我无法连接。收到以'hdfs://........'开头的未找到hdfs路径的错误。
  • 您是否能够运行 hdfs 命令并从您运行此代码的位置访问文件?
猜你喜欢
  • 1970-01-01
  • 2021-07-29
  • 1970-01-01
  • 2015-12-22
  • 2021-12-16
  • 1970-01-01
  • 2019-11-14
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多