【问题标题】:How to read multiple text files into a single RDD?如何将多个文本文件读入单个 RDD?
【发布时间】:2014-07-24 15:46:00
【问题描述】:

我想从 hdfs 位置读取一堆文本文件,并使用 spark 在迭代中对其执行映射。

JavaRDD<String> records = ctx.textFile(args[1], 1); 一次只能读取一个文件。

我想读取多个文件并将它们作为单个 RDD 处理。如何?

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    您可以指定整个目录、使用通配符甚至 CSV 的目录和通配符。例如:

    sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")
    

    正如 Nick Chammas 所指出的,这是 Hadoop 的 FileInputFormat 的曝光,因此这也适用于 Hadoop(和 Scalding)。

    【讨论】:

    • 是的,这是将多个文件作为单个 RDD 打开的最方便的方法。这里的 API 只是Hadoop's FileInputFormat API 的曝光,所以所有相同的Path 选项都适用。
    • sc.wholeTextFiles 对于非行分隔的数据很方便
    • 奇怪的是,如果你这样做并指定并行性,比如sc.textFile(multipleCommaSeparatedDirs,320),它会导致19430 总任务而不是320 ...它的行为就像union 这也导致从非常低的并行度到疯狂数量的任务
    • 我终于找到了这个邪恶的文件模式匹配是如何工作的stackoverflow.com/a/33917492/306488,所以我不再需要逗号分隔了
    • @femibyte 我不这么认为,虽然我不知道为什么除了wholeTextFiles 之外的任何情况下您都想知道文件名。你的用例是什么?如果您使用与文件相同数量的分区,我可以想到一种解决方法...
    【解决方案2】:

    如下使用union

    val sc = new SparkContext(...)
    val r1 = sc.textFile("xxx1")
    val r2 = sc.textFile("xxx2")
    ...
    val rdds = Seq(r1, r2, ...)
    val bigRdd = sc.union(rdds)
    

    那么bigRdd就是所有文件的RDD。

    【讨论】:

    • 谢谢cloud,这样我就可以阅读所有我想要的文件,但是只有一个!但是,我还是要写很多东西……
    【解决方案3】:

    您可以使用单个 textFile 调用来读取多个文件。斯卡拉:

    sc.textFile(','.join(files)) 
    

    【讨论】:

    • 和相同的python语法
    • 我认为这是 only python 语法。等效的 Scala 是 sc.textFile(files.mkString(","))
    【解决方案4】:

    你可以用这个

    首先你可以获得一个缓冲区/S3 路径列表:

    import scala.collection.JavaConverters._
    import java.util.ArrayList
    import com.amazonaws.services.s3.AmazonS3Client
    import com.amazonaws.services.s3.model.ObjectListing
    import com.amazonaws.services.s3.model.S3ObjectSummary
    import com.amazonaws.services.s3.model.ListObjectsRequest
    
    def listFiles(s3_bucket:String, base_prefix : String) = {
        var files = new ArrayList[String]
    
        //S3 Client and List Object Request
        var s3Client = new AmazonS3Client();
        var objectListing: ObjectListing = null;
        var listObjectsRequest = new ListObjectsRequest();
    
        //Your S3 Bucket
        listObjectsRequest.setBucketName(s3_bucket)
    
        //Your Folder path or Prefix
        listObjectsRequest.setPrefix(base_prefix)
    
        //Adding s3:// to the paths and adding to a list
        do {
          objectListing = s3Client.listObjects(listObjectsRequest);
          for (objectSummary <- objectListing.getObjectSummaries().asScala) {
            files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
          }
          listObjectsRequest.setMarker(objectListing.getNextMarker());
        } while (objectListing.isTruncated());
    
        //Removing Base Directory Name
        files.remove(0)
    
        //Creating a Scala List for same
        files.asScala
      }
    

    现在将此 List 对象传递给以下代码,注意:sc 是 SQLContext 的对象

    var df: DataFrame = null;
      for (file <- files) {
        val fileDf= sc.textFile(file)
        if (df!= null) {
          df= df.unionAll(fileDf)
        } else {
          df= fileDf
        }
      }
    

    现在你得到了一个最终的统一 RDD,即 df

    可选,也可以在单个 BigRDD 中重新分区

    val files = sc.textFile(filename, 1).repartition(1)
    

    重新分区总是有效的:D

    【讨论】:

    • 这不是意味着文件列表必须相对较小吗?不是数百万个文件。
    • 我们可以并行读取列出的文件的操作吗?类似 sc.parallelize 的东西?
    • @MathieuLongtin :如果您可以将分区发现应用到您的 Spark 代码,那么它会很棒,否则您需要做同样的事情。我曾经在大约一分钟内打开 10k 个文件。
    • @lazywiz 如果您不想创建单个 rdd,那么只需删除重新分区操作即可。
    【解决方案5】:

    在 PySpark 中,我发现了另一种有用的解析文件的方法。也许在 Scala 中有一个等价物,但我对想出一个工作翻译感到不舒服。实际上,这是一个添加了标签的 textFile 调用(在下面的示例中,key = filename, value = 文件中的 1 行)。

    “带标签的”文本文件

    输入:

    import glob
    from pyspark import SparkContext
    SparkContext.stop(sc)
    sc = SparkContext("local","example") # if running locally
    sqlContext = SQLContext(sc)
    
    for filename in glob.glob(Data_File + "/*"):
        Spark_Full += sc.textFile(filename).keyBy(lambda x: filename)
    

    输出:数组,每个条目包含一个元组,使用文件名作为键,值 = 文件的每一行。 (从技术上讲,使用这种方法,除了实际的文件路径名称之外,您还可以使用不同的键——也许是一个散列表示来节省内存)。 IE。

    [('/home/folder_with_text_files/file1.txt', 'file1_contents_line1'),
     ('/home/folder_with_text_files/file1.txt', 'file1_contents_line2'),
     ('/home/folder_with_text_files/file1.txt', 'file1_contents_line3'),
     ('/home/folder_with_text_files/file2.txt', 'file2_contents_line1'),
      ...]
    

    您也可以将任一行重新组合为行列表:

    Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()

    [('/home/folder_with_text_files/file1.txt', ['file1_contents_line1', 'file1_contents_line2','file1_contents_line3']),
     ('/home/folder_with_text_files/file2.txt', ['file2_contents_line1'])]
    

    或将整个文件重新组合回单个字符串(在此示例中,结果与从 wholeTextFiles 获得的结果相同,但字符串“file:”从文件路径中删除。):

    Spark_Full.groupByKey().map(lambda x: (x[0], ' '.join(list(x[1])))).collect()

    【讨论】:

    • 当我运行这行代码时 - Spark_Full += sc.textFile(filename).keyBy(lambda x: filename) 我得到了错误,即TypeError: 'PipelinedRDD' object is not iterable。我的理解是,该行创建了一个不可变的 RDD,所以我想知道您如何能够将它附加到另一个变量?
    【解决方案6】:

    你可以使用

    JavaRDD<String , String> records = sc.wholeTextFiles("path of your directory")
    

    在这里,您将获得文件的路径和该文件的内容。因此您可以一次执行整个文件的任何操作,从而节省开销

    【讨论】:

      【解决方案7】:

      sc.textFile的所有答案都是正确的

      我只是想知道为什么不wholeTextFiles 例如,在这种情况下......

      val minPartitions = 2
      val path = "/pathtohdfs"
          sc.wholeTextFiles(path,minPartitions)
            .flatMap{case (path, text) 
          ...
      

      一个限制是,我们必须加载小文件,否则性能会很差并可能导致 OOM。

      注意:

      • 整个文件应该适合内存
      • 适用于不能按行拆分的文件格式...例如 XML 文件

      进一步参考visit

      【讨论】:

      • 或者只是sc.wholeTextFiles(folder).flatMap...
      • sc.wholeTextFiles(“/path/to/dir”)
      【解决方案8】:

      有一个直接的清洁解决方案可用。使用 wholeTextFiles() 方法。这将获取一个目录并形成一个键值对。返回的 RDD 将是一对 RDD。 在下面找到Spark docs的描述:

      SparkContext.wholeTextFiles 允许您读取包含多个小文本文件的目录,并将每个文件作为(文件名,内容)对返回。这与 textFile 不同,后者会在每个文件中每行返回一条记录

      【讨论】:

        【解决方案9】:

        试试这个 用于将 DataFrame 写入外部存储系统(例如文件系统、键值存储等)的接口。使用 DataFrame.write() 来访问它。

        1.4 版中的新功能。

        csv(path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None ) 将 DataFrame 的内容以 CSV 格式保存在指定路径。

        参数: path – 任何 Hadoop 支持的文件系统中的路径 模式 - 指定数据已存在时保存操作的行为。

        append:将此 DataFrame 的内容附加到现有数据。 覆盖:覆盖现有数据。 忽略:如果数据已经存在,则静默忽略此操作。 错误(默认情况):如果数据已经存在,则抛出异常。 压缩 – 保存到文件时使用的压缩编解码器。这可以是已知的不区分大小写的缩写名称之一(none、bzip2、gzip、lz4、snappy 和 deflate)。 sep – 将单个字符设置为每个字段和值的分隔符。如果设置了 None,则使用默认值 ,。 quote – 设置用于转义引用值的单个字符,其中分隔符可以是值的一部分。如果设置了 None,它使用默认值,"。如果你想关闭引号,你需要设置一个空字符串。 escape – 设置用于在已引用的值内转义引号的单个字符。如果设置了 None,则使用默认值,\ escapeQuotes – 一个标志,指示是否应始终将包含引号的值括在引号中。如果设置了 None,它使用默认值 true,转义所有包含引号字符的值。 quoteAll – 一个标志,指示是否所有值都应始终括在引号中。如果设置了 None,它使用默认值 false,只转义包含引号字符的值。 header – 将列的名称写为第一行。如果设置了 None,它将使用默认值 false。 nullValue – 设置空值的字符串表示。如果设置了 None,它将使用默认值空字符串。 dateFormat – 设置表示日期格式的字符串。自定义日期格式遵循 java.text.SimpleDateFormat 中的格式。这适用于日期类型。如果设置了 None,则使用默认值 yyyy-MM-dd。 timestampFormat – 设置表示时间戳格式的字符串。自定义日期格式遵循 java.text.SimpleDateFormat 中的格式。这适用于时间戳类型。如果设置了 None,则使用默认值 yyyy-MM-dd'T'HH:mm:ss.SSSZZ。

        【讨论】:

          【解决方案10】:
          rdd = textFile('/data/{1.txt,2.txt}')
          

          【讨论】:

            猜你喜欢
            • 2015-02-13
            • 2020-01-29
            • 1970-01-01
            • 2020-05-18
            • 2017-04-29
            • 2021-03-08
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多