这是一个有助于完成任务的代码 sn-p。
第一步是获取每个日期的文件列表作为地图。 (Map[String, List[String]]) 其中键是日期,值是具有相同日期的文件列表。日期取自 HDFS 文件的修改时间戳。
注意:使用本地路径测试代码,根据需要给出正确的HDFS路径/url。
在编写输出时,没有直接选项来指定目标文件名,但您可以指定每个日期特定的目标目录。代码让我们使用 FileSystem API 将文件重命名为所需的文件并删除每个日期创建的临时输出文件夹。
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.spark.SparkContext
import org.joda.time.format.DateTimeFormat
object MergeFiles {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Merging files day wise in a directory")
.master("local[2]")
.getOrCreate()
val inputDir = "/Users/sujesh/test_data"
val outputDir = "/Users/sujesh/output_data"
val hadoopConf = spark.sparkContext.hadoopConfiguration
val fs = FileSystem.get(hadoopConf)
val filesPerDate = getFiles(inputDir, fs)
filesPerDate
.foreach { m =>
spark
.read
.format("csv")
.option("inferSchema", false)
.option("header", false)
.load(m._2:_*)
.repartition(1)
.write
.format("csv")
.save(s"$outputDir/${m._1}")
val file = fs.globStatus(new Path(s"$outputDir/${m._1}/part*.csv"))(0).getPath.getName
fs.rename(new Path(s"$outputDir/${m._1}/$file"), new Path(s"$outputDir/${m._1}.csv"))
fs.delete(new Path(s"$outputDir/${m._1}"), true)
}
}
/*
Get the list of files group by date
date is taken from file's modification timestamp
*/
def getFiles(dir: String, fs: FileSystem) = {
fs
.globStatus(new Path(s"$dir/*.csv"))
.map { f: FileStatus =>
(DateTimeFormat.forPattern("yyyyMMdd").print(f.getModificationTime), f.getPath.toUri.getRawPath)
}.groupBy(_._1)
.map { case (k,v) => (k -> v.map(_._2).toSeq) }
}
}
您可以在测试后进一步优化代码并将文件重命名代码转换为实用程序(如果必须重新使用)。已将 inferSchema 或 header 等所有选项设置为 false。根据需要使用它们。这种方法也适用于其他格式的文件。
注意:如果您在同一目录中重复执行此过程,则需要进一步调整,因为新创建的文件将具有最新的时间戳。因此,如果这不是每天运行,您也需要显式更新文件的修改时间戳或忽略具有文件名模式的文件,例如yyyyMMdd.csv