【问题标题】:Spark Context Textfile: load multiple filesSpark Context Textfile:加载多个文件
【发布时间】:2014-04-30 21:00:17
【问题描述】:

我需要处理分散在各个目录中的多个文件。我想将所有这些加载到一个 RDD 中,然后对其执行 map/reduce。我看到 SparkContext 能够使用通配符从单个目录加载多个文件。我不知道如何从多个文件夹加载文件。

以下代码sn-p失败:

for fileEntry in files:
    fileName = basePath + "/" + fileEntry
    lines = sc.textFile(fileName)
    if retval == None:
        retval = lines
    else:
        retval = sc.union(retval, lines)

这在第三个循环中失败并显示以下错误消息:

retval = sc.union(retval, lines)
TypeError: union() takes exactly 2 arguments (3 given)

这很奇怪,因为我只提供了 2 个参数。任何指针表示赞赏。

【问题讨论】:

  • ..但第一个参数是self。从docs,你需要sc.union([retval,lines])
  • 让我试试。我很惊讶为什么这将适用于 2 个循环并在第三个循环中失败......
  • 成功了。谢谢乔纳森!
  • 我刚刚意识到您可以使用sc.textFile(','.join(files)) 一口气阅读它们。

标签: python apache-spark


【解决方案1】:

这个措辞怎么样?

sc.union([sc.textFile(basepath + "/" + f) for f in files])

在 Scala 中,SparkContext.union() 有两种变体,一种采用可变参数,另一种采用列表。 Python 中只有第二种存在(因为 Python 没有多态性)。

更新

您可以使用单个textFile 调用来读取多个文件。

sc.textFile(','.join(files))

【讨论】:

  • 谢谢丹尼尔。我的问题可能以 Python 为中心。您的 sn-p 似乎是 Scala,
  • 啊,为什么我没有意识到这一点?! Python 中没有函数多态性,因此只能暴露一种形式的 SparkContext.union()。他们选择公开接受列表的人,而不是接受可变参数的人。 (就像乔纳森所说的那样。)
  • 我修复了使用 Python 而不是 Scala 的答案。
  • 这个答案有错字,但我无法编辑它,因为它不是 6 个字符长:"\n" 应该是 "/"
  • 谢谢!犯奇怪的错误...\n/ 在键盘上根本不相近:)。
【解决方案2】:

我使用通配符解决了类似的问题。

例如我在要在 spark 中加载的文件中发现了一些特征,

目录

subdir1/folder1/x.txt

subdir2/folder2/y.txt

你可以用下面这句话

sc.textFile("dir/*/*/*.txt")

加载所有相关文件。

通配符'*'只适用于单级目录,不递归。

【讨论】:

    【解决方案3】:

    可以使用SparkContext的如下功能:

    wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

    从 HDFS、本地文件系统(在所有节点上可用)或任何 Hadoop 支持的文件系统 URI 读取文本文件目录。每个文件被读取为单个记录并以键值对的形式返回,其中键是每个文件的路径,值是每个文件的内容。

    https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext

    【讨论】:

    • 这在大多数情况下效果很好,但根据我的经验,当文件很大时,这不起作用。
    【解决方案4】:

    你可以用这个

    首先你可以获得一个缓冲区/S3 路径列表:

    import scala.collection.JavaConverters._
    import java.util.ArrayList
    import com.amazonaws.services.s3.AmazonS3Client
    import com.amazonaws.services.s3.model.ObjectListing
    import com.amazonaws.services.s3.model.S3ObjectSummary
    import com.amazonaws.services.s3.model.ListObjectsRequest
    
    def listFiles(s3_bucket:String, base_prefix : String) = {
        var files = new ArrayList[String]
    
        //S3 Client and List Object Request
        var s3Client = new AmazonS3Client();
        var objectListing: ObjectListing = null;
        var listObjectsRequest = new ListObjectsRequest();
    
        //Your S3 Bucket
        listObjectsRequest.setBucketName(s3_bucket)
    
        //Your Folder path or Prefix
        listObjectsRequest.setPrefix(base_prefix)
    
        //Adding s3:// to the paths and adding to a list
        do {
          objectListing = s3Client.listObjects(listObjectsRequest);
          for (objectSummary <- objectListing.getObjectSummaries().asScala) {
            files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
          }
          listObjectsRequest.setMarker(objectListing.getNextMarker());
        } while (objectListing.isTruncated());
    
        //Removing Base Directory Name
        files.remove(0)
    
        //Creating a Scala List for same
        files.asScala
      }
    

    现在将此 List 对象传递给以下代码,注意:sc 是 SQLContext 的对象

    var df: DataFrame = null;
      for (file <- files) {
        val fileDf= sc.textFile(file)
        if (df!= null) {
          df= df.unionAll(fileDf)
        } else {
          df= fileDf
        }
      }
    

    现在你得到了最终的统一 RDD,即 df

    可选,也可以在单个 BigRDD 中重新分区

    val files = sc.textFile(filename, 1).repartition(1)
    

    重新分区总是有效的:D

    【讨论】:

      猜你喜欢
      • 2015-06-21
      • 2020-07-27
      • 2018-04-18
      • 2019-12-23
      • 2016-11-11
      • 1970-01-01
      • 1970-01-01
      • 2013-12-18
      • 2017-05-06
      相关资源
      最近更新 更多