【问题标题】:How are large directory trees processed in using the Spark API?使用 Spark API 如何处理大型目录树?
【发布时间】:2015-11-19 18:49:46
【问题描述】:

我是新的 Spark 用户,我正在尝试处理位于 HDFS 文件系统上的大量 XML 文件集。在 1 台机器(实际上是 VM)的“开发”集群上大约有 150k 个文件,总计约 28GB。

这些文件在 HDFS 中被组织成一个目录结构,这样在一个父目录下就有大约一百个子目录。每个“子”目录包含几百到几千个 XML 文件之间的任何内容。

我的任务是解析每个 XML 文件,使用 XPath 表达式提取一些值,然后将结果保存到 HBase。我正在尝试使用 Apache Spark 来做到这一点,但我运气不佳。我的问题似乎是 Spark API 和 RDD 工作方式的结合。在这一点上,分享一些伪代码来表达我正在尝试做的事情可能是谨慎的:

RDD[String] filePaths = getAllFilePaths()
RDD[Map<String,String>] parsedFiles = filePaths.map((filePath) => {
    // Load the file denoted by filePath
    // Parse the file and apply XPath expressions
})
// After calling map() above, I should have an RDD[Map<String,String>] where
// the map is keyed by a "label" for an xpath expression, and the
// corresponding value is the result of the expression applied to the file 

所以,暂时忽略我写给 HBase 的部分,让我们专注于上述内容。我无法从 RDD map() 调用中加载文件。

我尝试了很多不同的方法,但都失败了:

  1. 使用callSparkContext.textFile("/my/path") 加载文件失败,因为SparkContext 不可序列化
  2. 使用 Hadoop API 中的 callFileSystem.open(path),其中 FileSystem 在 RDD 外部实例化失败,因为 FileSystem 不可序列化
  3. 使用从 Hadoop API 调用 FileSystem.open(path),其中 FileSystem内部被实例化,RDD 失败,因为程序用完了文件句柄。

替代方法包括尝试使用SparkContext.wholeTextFiles("/my/path/*"),因此我不必从 map() 调用中加载文件,因为程序内存不足而失败。这大概是因为它急切地加载文件。

有没有人在他们自己的工作中尝试过类似的事情,如果有,您使用了什么方法?

【问题讨论】:

  • 我认为wholeTextFiles 是要走的路。只需添加更多内存即可。
  • 感谢您的评论。不幸的是,我没有 28GB 的​​内存可以玩。一旦我收到一个更大的文件集,比如 500GB 而不是 28GB,采用这种方法就会崩溃。
  • 不,wholeTextFiles 只需要足够的内存来加载最大的文件。如果您获得更多文件,那很好,您将不需要更多内存。 (每个执行线程一次加载 1 个文件。您还可以限制执行线程的数量,因此每个线程有更多的内存。)

标签: java scala hadoop apache-spark


【解决方案1】:

尝试使用通配符读取整个目录。 val errorCount = sc.textFile("hdfs://some-directory/*")

其实spark可以读取整个hfs目录,引用自sparkdocumentation

Spark 的所有基于文件的输入法,包括 textFile,都支持 也可以在目录、压缩文件和通配符上运行。为了 例如,您可以使用textFile("/my/directory")textFile("/my/directory/*.txt")textFile("/my/directory/*.gz")

【讨论】:

  • 感谢您的回复。我曾考虑过这一点,但那是超过 150,000 个文件的大约 28GB 数据 - 如果该方法急切地将文件加载到 RDD 中,那么它将不适合内存。
  • 你能把这些文件按目录分成几批吗?我的意思是您可以一次将文件分成 5 或 8 个输入 RDD。顺便说一句,这不是 RDD map() 应该做的。
  • 我想我可以,但这必须以编程方式完成。你对 map() 的评论是什么意思?
  • @Jon 你所做的是在 RDD 之间,而不是在一个 RDD 内,所以这不是 RDD 的 map() 函数可以做的。
  • sc.textFile 会将所有文件读入单个 RDD,每个文件行对应一个 RDD 行。您将无法知道每个文件的开始和结束位置。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-03-17
  • 2017-03-04
  • 2016-02-04
  • 2016-04-22
  • 2017-10-28
  • 2012-07-21
  • 2019-07-23
相关资源
最近更新 更多