【发布时间】:2018-04-15 00:27:38
【问题描述】:
我有下面的代码来启动一个 spark 作业,当我处理少于 40 个(我的机器中的最大内核数)的文件时,并行化工作正常,但是当我处理的文件超过这个时,它会产生麻烦。请有任何建议。 `
object Cleanup extends Processor {
def main(args: Array[String]): Unit = {
val fileSeeker = new TelemetryFileSeeker("Config")
val files = fileSeeker.searchFiles(bucketName, urlPrefix, "2018-01-01T00:00:00.000Z", "2018-04-30T00:00:00.000Z").filter(_.endsWith(".gz"))
.map(each => (each, each.slice(0, each.lastIndexOf("/")))).slice(0,100)
if (files.nonEmpty) {
println("Number of Files" + files.length)
sc.parallelize(files).map(each => changeFormat(each)).collect()
}
}
def changeFormat(file: (String, String)): Unit = {
val fileProcessor = new Processor("Config", sparksession)
val uuid = java.util.UUID.randomUUID.toString
val tempInput = "inputfolder" + uuid
val tempOutput = "outputfolder" + uuid
val inpaths = Paths.get(tempInput)
val outpaths = Paths.get(tempOutput)
if (Files.notExists(inpaths)) Files.createDirectory(inpaths)
if (Files.notExists(outpaths)) Files.createDirectory(outpaths)
val downloadedFiles = fileProcessor.downloadAndUnzip(bucketName, List(file._1), tempInput)
val parsedFiles = fileProcessor.parseCSV(downloadedFiles)
parsedFiles.select(
"pa1",
"pa2",
"pa3"
).withColumn("pa4", lit(0.0)).write.mode(SaveMode.Overwrite).format(CSV_FORMAT)
.option("codec", "org.apache.hadoop.io.compress.GzipCodec").save(tempOutput)
val processedFiles = new File(tempOutput).listFiles.filter(_.getName.endsWith(".gz"))
val filesNames = processedFiles.map(_.getName).toList
val filesPaths = processedFiles.map(_.getPath).toList
fileProcessor.cleanUpRemote(bucketName, "new/" + file._2, filesNames)
fileProcessor.uploadFiles(bucketName, "new/" + file._2, filesPaths)
fileProcessor.cleanUpLocal(tempInput, tempOutput)
val remoteFiles = fileProcessor.checkRemote(bucketName, "new/" + file._2, filesNames)
logger.info("completed " + file._1)
}
}
下面的spark配置
lazy val spark = SparkSession
.builder()
.appName("Project")
.config("spark.master", "local[*]")
.config("spark.sql.warehouse.dir", warehouseLocation)
.config("spark.executor.memory", "5g")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
仅供参考:每个函数 parsecsv 函数将 1 个文件下载到临时文件夹中,并在特定文件夹中创建一个数据框。文件大小为 1GB。另外,我正在尝试使用 java -cp jar 类来运行它。
【问题讨论】:
标签: scala apache-spark