所有与文本相关的数据源,包括CSVDataSource,都使用 Hadoop File API 来处理文件(它也在 Spark Core 的 RDD 中)。
您可以在readFile 中找到导致HadoopFileLinesReader 的相关行,其中包含以下行:
val fileSplit = new FileSplit(
new Path(new URI(file.filePath)),
file.start,
file.length,
// TODO: Implement Locality
Array.empty)
使用 Hadoop 的 org.apache.hadoop.fs.Path 处理底层文件的压缩。
快速谷歌搜索后,我找到了处理压缩的 Hadoop 属性,即mapreduce.output.fileoutputformat.compress。
这使我使用了以下压缩配置的 Spark SQL 的 CompressionCodecs:
"none" -> null,
"uncompressed" -> null,
"bzip2" -> classOf[BZip2Codec].getName,
"deflate" -> classOf[DeflateCodec].getName,
"gzip" -> classOf[GzipCodec].getName,
"lz4" -> classOf[Lz4Codec].getName,
"snappy" -> classOf[SnappyCodec].getName)
在下面的代码中,您可以找到使用“我们的”选项的setCodecConfiguration。
def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
if (codec != null) {
conf.set("mapreduce.output.fileoutputformat.compress", "true")
conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
conf.set("mapreduce.map.output.compress", "true")
conf.set("mapreduce.map.output.compress.codec", codec)
} else {
// This infers the option `compression` is set to `uncompressed` or `none`.
conf.set("mapreduce.output.fileoutputformat.compress", "false")
conf.set("mapreduce.map.output.compress", "false")
}
}
另一种方法getCodecClassName 用于解析JSON、CSV 和text 格式的compression 选项。