【问题标题】:Reading a text file from HDFS in Scala/Spark在 Scala/Spark 中从 HDFS 读取文本文件
【发布时间】:2017-12-31 06:01:10
【问题描述】:

我正在使用 Scala 和 Spark,并希望将 XML 文件作为单个字符串读取。 我正在努力寻找一种简洁的 Scala 风格的方法来做到这一点。

我的第一个想法是使用

val fileContents: RDD[String] = sparkContext.textfile(pathToFile)
val combinedContents: String = fileContents.reduce((line1, line2) => line1 + line2)

但我担心这会保持行的顺序,这对于保持字符串中包含的 xml 的完整性很重要。

我在网上找到的用于在 HDFS 中读取文件的其他内容涉及使用已弃用的方法,因此我想避免使用这些方法。有什么想法吗?

【问题讨论】:

  • 如果可以的话,我想避免使用非标准库,因为这会带来成本(支持、管理批准等)而且 xml 管理在应用程序的后期出现,我需要提供一个字符串。我现在并不关心它是 xml,这只是为了说明行的顺序很重要。
  • 那为什么不fileContents.collect呢?
  • 我担心是 sparkContext.textfile 阶段会导致订单丢失,但是最初的实验似乎表明并非如此,而 fileContents.collect.mkString 似乎可以工作。如果是这样,太好了,谢谢!是减少导致订单分崩离析还是我只是无法重复 .textfile 以破坏订单?

标签: xml scala apache-spark hdfs


【解决方案1】:

sc.textFile 返回一个带有“排序行”的 RDD。 请注意,如果您在提供的路径中有多个文件,则文件也将按字母顺序(文件名)分配给分区。 因此,作为结论,sc.textFile 保持行的顺序。

据我所知,查看 collect() 方法的实现,顺序也保持不变,所以没有理由不直接使用:

sc.textFile(pathToFile).collect()

这应该可行。

但是,如果您想为 collect 的不同实现做好准备(因为在文档中不能保证保持顺序),我建议的解决方案是使用 RDD 方法 zipWithIndex,它在哲学上等同于 scala 的方法名字。

所以我会这样做:

sc.textFile(pathToFile).zipWithIndex().collect().sortBy(_._2).map(_._1)

【讨论】:

  • sc.textFile(pathToFile).collect() 是我的选择。似乎我对来自 textFile() 的订单的担忧是没有根据的。
  • 据我所知 .collect() 在处理大文件时并不理想。您将如何处理较大文件的问题?
【解决方案2】:

选项:

  1. 读取整个文件:

sparkContext.wholeTextFiles(filePath)

但是如果你没有很多这样的文件,看起来像开销。

  1. 获取 HDFS 文件系统对象,并将文件作为 InputStream 读取。很多例子可用:HDFS FileSystems API example

【讨论】:

    猜你喜欢
    • 2021-06-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-08-07
    • 2021-06-25
    • 1970-01-01
    • 1970-01-01
    • 2019-08-22
    相关资源
    最近更新 更多