【问题标题】:Spark job hangingup in EC2 m4.10x machineEC2 m4.10x 机器中的 Spark 作业挂起
【发布时间】:2018-04-15 00:27:38
【问题描述】:

我有下面的代码来启动一个 spark 作业,当我处理少于 40 个(我的机器中的最大内核数)的文件时,并行化工作正常,但是当我处理的文件超过这个时,它会产生麻烦。请有任何建议。 `

object Cleanup extends Processor {
  def main(args: Array[String]): Unit = {
    val fileSeeker = new TelemetryFileSeeker("Config")
    val files = fileSeeker.searchFiles(bucketName, urlPrefix, "2018-01-01T00:00:00.000Z", "2018-04-30T00:00:00.000Z").filter(_.endsWith(".gz"))
      .map(each => (each, each.slice(0, each.lastIndexOf("/")))).slice(0,100)
    if (files.nonEmpty) {
      println("Number of Files" + files.length)
      sc.parallelize(files).map(each => changeFormat(each)).collect()
    }
  }

  def changeFormat(file: (String, String)): Unit = {
    val fileProcessor = new Processor("Config", sparksession)
    val uuid = java.util.UUID.randomUUID.toString
    val tempInput = "inputfolder" + uuid
    val tempOutput = "outputfolder" + uuid

    val inpaths = Paths.get(tempInput)
    val outpaths = Paths.get(tempOutput)
    if (Files.notExists(inpaths)) Files.createDirectory(inpaths)
    if (Files.notExists(outpaths)) Files.createDirectory(outpaths)
    val downloadedFiles = fileProcessor.downloadAndUnzip(bucketName, List(file._1), tempInput)
    val parsedFiles = fileProcessor.parseCSV(downloadedFiles)
    parsedFiles.select(
      "pa1",
      "pa2",
      "pa3"
    ).withColumn("pa4", lit(0.0)).write.mode(SaveMode.Overwrite).format(CSV_FORMAT)
      .option("codec", "org.apache.hadoop.io.compress.GzipCodec").save(tempOutput)

    val processedFiles = new File(tempOutput).listFiles.filter(_.getName.endsWith(".gz"))
    val filesNames = processedFiles.map(_.getName).toList
    val filesPaths = processedFiles.map(_.getPath).toList

    fileProcessor.cleanUpRemote(bucketName, "new/" + file._2, filesNames)
    fileProcessor.uploadFiles(bucketName, "new/" + file._2, filesPaths)
    fileProcessor.cleanUpLocal(tempInput, tempOutput)

    val remoteFiles = fileProcessor.checkRemote(bucketName, "new/" + file._2, filesNames)
    logger.info("completed " + file._1)
  }
}

下面的spark配置

lazy val spark = SparkSession
.builder()
.appName("Project")
.config("spark.master", "local[*]")
.config("spark.sql.warehouse.dir", warehouseLocation)
.config("spark.executor.memory", "5g")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()

仅供参考:每个函数 parsecsv 函数将 1 个文件下载到临时文件夹中,并在特定文件夹中创建一个数据框。文件大小为 1GB。另外,我正在尝试使用 java -cp jar 类来运行它。

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    虽然我无法找出确切的问题,但我尝试通过使用“List->grouped”方法一次仅传递 38 个文件来绕过该问题

    【讨论】:

      猜你喜欢
      • 2018-05-20
      • 2016-09-22
      • 1970-01-01
      • 1970-01-01
      • 2018-03-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多