【问题标题】:Read all files in a nested folder in Spark读取 Spark 中嵌套文件夹中的所有文件
【发布时间】:2015-08-26 18:01:12
【问题描述】:

如果我们有一个文件夹folder 包含所有.txt 文件,我们可以使用sc.textFile("folder/*.txt") 读取它们。但是,如果我有一个文件夹 folder 包含更多名为 datewise 的文件夹,例如 0304、...,其中还包含一些 .log 文件。如何在 Spark 中阅读这些内容?

在我的例子中,结构更加嵌套和复杂,因此首选通用答案。

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    如果目录结构是规则的,我们可以这样说:

    folder
    ├── a
    │   ├── a
    │   │   └── aa.txt
    │   └── b
    │       └── ab.txt
    └── b
        ├── a
        │   └── ba.txt
        └── b
            └── bb.txt
    

    您可以为每一级嵌套使用* 通配符,如下所示:

    >>> sc.wholeTextFiles("/folder/*/*/*.txt").map(lambda x: x[0]).collect()
    
    [u'file:/folder/a/a/aa.txt',
     u'file:/folder/a/b/ab.txt',
     u'file:/folder/b/a/ba.txt',
     u'file:/folder/b/b/bb.txt']
    

    【讨论】:

    • 这解决了我的特殊问题。顺便说一句,如果目录结构不规则怎么办?
    • 然后事情开始变得混乱 :) 想法或多或少是相同的,但您不太可能准备可以轻松重用的模式。您始终可以使用普通工具来遍历文件系统并收集路径,而不是硬编码。
    • 为什么这不适用于/folder/**/*.txt?我的目录结构基本完全相同,我想用sc.wholeTextFiles('data/**/*.json') 打开所有目录,但这似乎不起作用..?
    • @zero323,我无法在 Wholetextfile 中使用 whilecard 来获取错误 llegalArgumentException: 'java.net.URISyntaxException: Expected scheme-specific part at index.
    • 在这种情况下,由于“a”目录下的所有文件都是.txt,所以最后一个/*.txt是不必要的。
    【解决方案2】:

    Spark 3.0 提供了一个选项 recursiveFileLookup 来从递归子文件夹加载文件。

    val df= sparkSession.read
           .option("recursiveFileLookup","true")
          .option("header","true")
          .csv("src/main/resources/nested")
    

    这会递归地从 src/main/resources/nested 及其子文件夹中加载文件。

    【讨论】:

      【解决方案3】:

      如果你只想使用以“a”开头的文件,你可以使用

      sc.wholeTextFiles("/folder/a*/*/*.txt") or sc.wholeTextFiles("/folder/a*/a*/*.txt")
      

      也是。我们可以使用 * 作为通配符。

      【讨论】:

        【解决方案4】:

        sc.wholeTextFiles("/directory/201910*/part-*.lzo") 获取所有匹配文件名称,而不是文件内容。

        如果你想加载一个目录中所有匹配文件的内容,你应该使用

        sc.textFile("/directory/201910*/part-*.lzo")
        

        并设置递归读取目录!

        sc._jsc.hadoopConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true")
        

        提示: scala 与 python 不同,下面设置使用 scala!

        sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive", "true")
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 2017-08-19
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2018-06-06
          • 1970-01-01
          • 2011-08-20
          相关资源
          最近更新 更多