【问题标题】:How does Spark SQL read compressed csv files?Spark SQL如何读取压缩的csv文件?
【发布时间】:2017-12-02 02:14:02
【问题描述】:

我尝试使用 api spark.read.csv 读取扩展名为 bzgzip 的压缩 csv 文件。有效。但是在源代码中我没有找到任何可以声明codec 类型的选项参数。

即使在这个link中,也只有codec在书写侧的设置。谁能告诉我或提供显示 spark 2.x 版本如何处理压缩 csv 文件的源代码路径。

【问题讨论】:

  • 请注意,Spark 将使用单个任务读取压缩的 CSV,而不是在读取未压缩的 CSV 时并行读取多个任务。

标签: csv apache-spark apache-spark-sql


【解决方案1】:

所有与文本相关的数据源,包括CSVDataSource,都使用 Hadoop File API 来处理文件(它也在 Spark Core 的 RDD 中)。

您可以在readFile 中找到导致HadoopFileLinesReader 的相关行,其中包含以下行:

val fileSplit = new FileSplit(
  new Path(new URI(file.filePath)),
  file.start,
  file.length,
  // TODO: Implement Locality
  Array.empty)

使用 Hadoop 的 org.apache.hadoop.fs.Path 处理底层文件的压缩。


快速谷歌搜索后,我找到了处理压缩的 Hadoop 属性,即mapreduce.output.fileoutputformat.compress

这使我使用了以下压缩配置的 Spark SQL 的 CompressionCodecs

"none" -> null,
"uncompressed" -> null,
"bzip2" -> classOf[BZip2Codec].getName,
"deflate" -> classOf[DeflateCodec].getName,
"gzip" -> classOf[GzipCodec].getName,
"lz4" -> classOf[Lz4Codec].getName,
"snappy" -> classOf[SnappyCodec].getName)

在下面的代码中,您可以找到使用“我们的”选项的setCodecConfiguration

  def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
    if (codec != null) {
      conf.set("mapreduce.output.fileoutputformat.compress", "true")
      conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
      conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
      conf.set("mapreduce.map.output.compress", "true")
      conf.set("mapreduce.map.output.compress.codec", codec)
    } else {
      // This infers the option `compression` is set to `uncompressed` or `none`.
      conf.set("mapreduce.output.fileoutputformat.compress", "false")
      conf.set("mapreduce.map.output.compress", "false")
    }
  }

另一种方法getCodecClassName 用于解析JSONCSVtext 格式的compression 选项。

【讨论】:

  • 谢谢你,伙计。我检查了Path 包文件,仍然有点困惑。如果您能提供更多细节,那就太好了;就像Path 包的哪一部分处理压缩一样。再次感谢您。
  • 在处理压缩的 Spark SQL 代码中添加了一些附加链接。由于我对 Hadoop 的源代码一无所知,因此我将把它作为一个家庭练习留给你。
  • 非常感谢您的耐心和善意。我通过链阅读了getCodecClassName 代码。我发现那部分代码只在写作方面被调用。我没有在阅读方面找到用法。我认为这项工作可能由文件系统完成;但找不到证据。
  • 您引用的所有部分都涉及写入文件。因此,为什么所有提到的选项的名称中都有“输出”。问题是关于读取文件。
  • 有趣的信息,但就像一些评论者指出的那样,这只是关于写入端,而不是读取端。 This answer 没有展示 Spark 如何在读取时选择编解码器的相关内部结构,但它至少演示了如何指定自定义读取编解码器。
【解决方案2】:

您不必为 gz 压缩的 csv,tsv 文件做任何特别的事情,即可被 spark 2.x 版本读取。下面的代码用spark 2.0.2进行了尝试

val options= Map("sep" -> ",")
val csvRDD = spark.read.options(options).csv("file.csv.gz")

我对制表符分隔的 gz 文件也做了类似的处理

val options= Map("sep" -> "\t")
val csvRDD = spark.read.options(options).csv("file.tsv.gz")

您也可以指定文件夹以读取多个.gz文件并结合解压缩文件

 val csvRDD = spark.read.options(options).csv("/users/mithun/tsvfilelocation/")

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-01-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-10-04
    相关资源
    最近更新 更多