【问题标题】:Can spark ignore unreadable files?spark可以忽略不可读的文件吗?
【发布时间】:2019-05-15 22:05:27
【问题描述】:

我有一个如下形式的文件结构

s3://<bucket>/year=2018/month=11/day=26/hour=10/department=x/part-xxxxx.gz.parquet

我的 AWS 凭证无法访问所有 department= 值,只有少数。

我正在尝试执行

df = spark.read.parquet("s3://<bucket>/year=2018/") 

这失败了

java.io.IOException: Could not read footer: java.io.IOException: Could not read footer for file FileStatus{path=s3://<bucket>/year=2018/month=11/day=26/hour=10/department=yyyyyy/part-xxxxx.gz.parquet; isDirectory=false; length=104448; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}

显然失败了,因为我只能访问department=x 而不能访问department=yyyy,我的问题是:有没有办法默默地忽略这些?

我目前的解决方法是像这样构造仅有效路径

   tmpl = 's3://<bucket>/year=2018/month=11/day=26/hour={hour}/department=x/'
   df = spark.read.parquet(*list(tmpl.format(hour=hour) for hour in range(1,24)))

这很麻烦因为

  1. 某些路径可能不存在(缺少数据数小时等)。
  2. spark.read.parquet 不接受列表或生成器作为输入,因此我不得不使用 splat/unpack 运算符将所有内容转换为单独的参数。不确定这是否适用于数千条路径。

有没有更好的方法来加载这些数据而不改变文件结构(我无法控制)?

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql


    【解决方案1】:

    它显然失败了,因为我只能访问 department=x 而不能 部门=年年,我的问题是:有没有办法默默地 忽略那些?

    根据文件状态对象,您对日志中提到的文件具有读写访问权限 (permission=rw-rw-rw-)。可能是文件路径的其他问题。

    java.io.IOException:无法读取页脚:java.io.IOException:可以 不读取文件的页脚 FileStatus{path=s3:///year=2018/month=11/day=26/hour=10/department=yyyyyy/part-xxxxx.gz.parquet;是目录=假;长度=104448;复制=0;块大小=0; 修改时间=0;访问时间=0;所有者=;组=; 权限=rw-rw-rw-; isSymlink=false}

    从上述错误中,您以不正确的字符串格式传递 fileStatus 对象。

    你必须通过fileStat.getPath.toString,即在你的情况下path=s3:///year=2018/month=11/day=26/hour=10/department=yyyyyy/part-xxxxx.gz.parquet;

    df = spark.read.parquet(...) 会起作用。

    如果你想传递拼花文件的文件夹,你可以传递。

    或者你想过滤下面的示例代码 scala sn-p 你可以使用的某些文件

    /**
        * getAllFilePath.
        *
        * @param filePath Path
        * @param fs       FileSystem
        * @return list of absolute file path present in given path
        * @throws FileNotFoundException
        * @throws IOException
        */
      @throws[FileNotFoundException]
      @throws[IOException]
      def getAllFilePath(filePath: Path, fs: FileSystem): ListBuffer[String] = {
        val fileList = new ListBuffer[String]
        val fileStatus = fs.listStatus(filePath)
        for (fileStat <- fileStatus) {
          logInfo(s"file path Name : ${fileStat.getPath.toString} length is  ${fileStat.getLen}")
          if (fileStat.isDirectory) fileList ++= (getAllFilePath(fileStat.getPath, fs))
          else if (fileStat.getLen > 0 && !fileStat.getPath.toString.isEmpty) {
            logInfo("fileStat.getPath.toString" + fileStat.getPath.toString)
            fileList.foreach(println)
            fileList += fileStat.getPath.toString
          } else if (fileStat.getLen == 0) {
            logInfo(" length zero files \n " + fileStat)
    
            // fs.rename(fileStat.getPath, new Path(fileStat.getPath+"1"))
          }
        }
        fileList
      }
    

    喜欢这个例子

        val fs = FileSystem.get(new URI(inputPath), spark.sparkContext.hadoopConfiguration)
     yourFiles = getAllFilePath(new Path(inputPath), fs)
    
    val df = spark.read.parquet(yourFiles:_*)
    

    【讨论】:

    • 已解决问题还是您的问题有所不同?
    猜你喜欢
    • 2023-03-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-08-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-07-30
    相关资源
    最近更新 更多