【问题标题】:Spark: Process Files in different order then returnedSpark:以不同的顺序处理文件然后返回
【发布时间】:2016-02-19 15:32:52
【问题描述】:

我正在使用在云中运行的 Spark。我的存储不是传统的 HDFS,但我通过 URL 连接到我的文件。因此,例如我可以执行以下操作,Spark 将接收目录中的所有文件。我还可以使用任何其他 HDFS 函数调用。

sc.TextFile("hdfs://mystorage001/folder/month1/*")

我的数据分布在 5 个不同的驱动器之间,我希望在从每个驱动器读取数据之间进行循环,这样我就可以并行读取所有 5 个驱动器。我目前可以执行以下操作并处理我的所有数据,但它不会并行读取驱动器。相反,spark 会读取一个驱动器中的所有文件,然后移动到下一个驱动器。

sc.TextFile("hdfs://mystorage001/folder/month1/*, hdfs://mystorage002/folder/month2/*, hdfs://mystorage003/folder/month3/*, hdfs://mystorage004/folder/month4/*,hdfs://mystorage005/folder/month5/*")

我有 100 个执行者。所以我也试过这个,但这给了我最差的表现。

sc.TextFile("hdfs://mystorage001/folder/month1/*, 20) union sc.TextFile("hdfs://mystorage002/folder/month2/*, 20) union sc.TextFile("hdfs://mystorage003/folder/month3/*, 20) union sc.TextFile("hdfs://mystorage004/folder/month4/*, 20) union sc.TextFile("hdfs://mystorage005/folder/month5/*")

我知道在每个目录中,我都有名为 000000_0、000001_0、000002_0 等的文件...所以如果我可以按名称的那一部分订购 spark 读取的文件列表,我认为这将完成我想要的,但我想出如何返回列表的唯一方法是通过wholeTextFile() 无论如何都需要先加载所有数据。

【问题讨论】:

标签: scala apache-spark hdfs


【解决方案1】:
import org.apache.hadoop.fs._
val fs = FileSystem.get(sc.hadoopConfiguration)
val path = Array(new Path("hdfs://mystorage001/folder/month1/"), 
new Path("hdfs://mystorage001/folder/month2/"), 
new Path("hdfs://mystorage001/folder/month3/"), 
new Path("hdfs://mystorage001/folder/month4/"), 
new Path("hdfs://mystorage001/folder/month5/"))
path.map(fs.listStatus(_)).flatMap(_.zipWithIndex).sortBy(_._2).map(_._1.getPath).mkString(",")

【讨论】:

    猜你喜欢
    • 2022-01-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-03-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多