【问题标题】:Apache Spark on YARN: Large number of input data files (combine multiple input files in spark)YARN 上的 Apache Spark:大量输入数据文件(在 spark 中合并多个输入文件)
【发布时间】:2015-08-20 19:58:21
【问题描述】:

需要有关实施最佳实践的帮助。 运行环境如下:

  • 日志数据文件不定期到达。
  • 日志数据文件的大小为 3.9KB 到 8.5MB。平均约为 1MB。
  • 一个数据文件的记录数从13行到22000行。平均约为 2700 行。
  • 数据文件必须在聚合前进行后处理。
  • 可以更改后处理算法。
  • 后处理文件与原始数据文件分开管理,因为后处理算法可能会改变。
  • 执行每日聚合。所有后处理的数据文件必须逐条过滤并计算聚合(平均值,最大最小值...)。
  • 由于聚合是细粒度的,聚合后的记录数不会那么少。它可以是原始记录数量的一半左右。
  • 在某一时刻,后处理文件的数量可以达到 200,000 左右。
  • 数据文件应该可以单独删除。

在测试中,我尝试通过 Spark 处理 160,000 个后处理文件,该文件从带有 glob 路径的 sc.textFile() 开始,但由于驱动程序进程出现 OutOfMemory 异常而失败。

处理此类数据的最佳做法是什么? 我应该使用 HBase 而不是普通文件来保存后处理数据吗?

【问题讨论】:

    标签: hadoop apache-spark hadoop-yarn


    【解决方案1】:

    我们编写了自己的加载器。它解决了我们在 HDFS 中处理小文件的问题。它使用 Hadoop CombineFileInputFormat。 在我们的例子中,它将映射器的数量从 100000 减少到大约 3000 并且使工作显着加快。

    https://github.com/RetailRocket/SparkMultiTool

    例子:

    import ru.retailrocket.spark.multitool.Loaders 
    val sessions = Loaders.combineTextFile(sc, "file:///test/*") 
    // or val sessions = Loaders.combineTextFile(sc, conf.weblogs(), size = 256, delim = "\n") 
    // where size is split size in Megabytes, delim - line break character 
    println(sessions.count())
    

    【讨论】:

    • 感谢您分享此内容。我认为 size 参数特别有价值,因为它不能在 coalesce() 上指定。
    • 此解决方案比合并更好,因为它在地图阶段有效,但在之后合并。
    • 由于hadoop现在支持CombineTextInputFormat(至少从2.2开始),组合小输入文件可以用sc.newAPIHadoopFile()完成,无需实现自定义类。
    【解决方案2】:

    我很确定您获得 OOM 的原因是因为处理了如此多的小文件。你想要的是合并输入文件,这样你就不会得到这么多的分区。我尝试将我的作业限制在大约 10k 个分区。

    textFile 之后,您可以使用.coalesce(10000, false) ...虽然不是100% 确定这会起作用,因为我已经有一段时间没有这样做了,请告诉我。所以试试

    sc.textFile(path).coalesce(10000, false)
    

    【讨论】:

    • 谢谢!我会试试的。
    • 成功了!实际上我使用了合并因子 1227,这是 Spark 处理包含整个记录的大单个文件时的分区数。但是作业运行速度较慢(如预期的那样),并且似乎所有文件的信息仍在传输到驱动程序进程,当涉及的文件过多时可能导致OOM。但是 1.68GB 的​​驱动进程对于 168016 文件来说还不错。
    • 好吧,我们有一个单独的简单工作,专门用于减少文件数量,因为它是如此重要。一旦我不得不在 5 中运行它,就需要 5 个子集
    • 嘿,你能帮个忙吗 :) 你能把“在 spark 中组合多个输入文件”放在你的问题中 - 甚至放在标题中......它会让你更容易谷歌这个问题的人。
    • 更改了问题标题。谢谢。
    【解决方案3】:

    你可以用这个

    首先,您可以获得 S3 路径的缓冲区/列表/HDFS 或本地路径相同

    如果您正在尝试使用 Amazon S3,那么:

    import scala.collection.JavaConverters._
    import java.util.ArrayList
    import com.amazonaws.services.s3.AmazonS3Client
    import com.amazonaws.services.s3.model.ObjectListing
    import com.amazonaws.services.s3.model.S3ObjectSummary
    import com.amazonaws.services.s3.model.ListObjectsRequest
    
    def listFiles(s3_bucket:String, base_prefix : String) = {
        var files = new ArrayList[String]
    
        //S3 Client and List Object Request
        var s3Client = new AmazonS3Client();
        var objectListing: ObjectListing = null;
        var listObjectsRequest = new ListObjectsRequest();
    
        //Your S3 Bucket
        listObjectsRequest.setBucketName(s3_bucket)
    
        //Your Folder path or Prefix
        listObjectsRequest.setPrefix(base_prefix)
    
        //Adding s3:// to the paths and adding to a list
        do {
          objectListing = s3Client.listObjects(listObjectsRequest);
          for (objectSummary <- objectListing.getObjectSummaries().asScala) {
            files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
          }
          listObjectsRequest.setMarker(objectListing.getNextMarker());
        } while (objectListing.isTruncated());
    
        //Removing Base Directory Name
        files.remove(0)
    
        //Creating a Scala List for same
        files.asScala
      }
    

    现在将此 List 对象传递给以下代码,注意:sc 是 SQLContext 的对象

    var df: DataFrame = null;
      for (file <- files) {
        val fileDf= sc.textFile(file)
        if (df!= null) {
          df= df.unionAll(fileDf)
        } else {
          df= fileDf
        }
      }
    

    现在你得到了一个最终的统一 RDD,即 df

    可选,也可以在单个 BigRDD 中重新分区

    val files = sc.textFile(filename, 1).repartition(1)
    

    重新分区总是有效的:D

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2016-06-15
      • 1970-01-01
      • 2017-10-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多