【发布时间】:2015-04-18 15:18:47
【问题描述】:
我有想要“通过”Spark 打开的 zip 文件。由于 Hadoop 原生 Codec 支持,我可以打开 .gzip 文件,但无法使用 .zip 文件。
是否有一种简单的方法可以读取 Spark 代码中的 zip 文件?我还搜索了要添加到 CompressionCodecFactory 的 zip 编解码器实现,但到目前为止没有成功。
【问题讨论】:
标签: hadoop apache-spark
我有想要“通过”Spark 打开的 zip 文件。由于 Hadoop 原生 Codec 支持,我可以打开 .gzip 文件,但无法使用 .zip 文件。
是否有一种简单的方法可以读取 Spark 代码中的 zip 文件?我还搜索了要添加到 CompressionCodecFactory 的 zip 编解码器实现,但到目前为止没有成功。
【问题讨论】:
标签: hadoop apache-spark
python 代码没有解决方案,我最近不得不阅读 pyspark 中的 zip。而且,在搜索如何做到这一点时,我遇到了这个问题。所以,希望这对其他人有所帮助。
import zipfile
import io
def zip_extract(x):
in_memory_data = io.BytesIO(x[1])
file_obj = zipfile.ZipFile(in_memory_data, "r")
files = [i for i in file_obj.namelist()]
return dict(zip(files, [file_obj.open(file).read() for file in files]))
zips = sc.binaryFiles("hdfs:/Testing/*.zip")
files_data = zips.map(zip_extract).collect()
在上面的代码中,我返回了一个字典,其中 zip 中的文件名作为键,每个文件中的文本数据作为值。您可以根据自己的目的进行更改。
【讨论】:
bz2 文件应用相同的逻辑。我试图无法将 rdd 转换为 BytesIO
bz2.decompress解压内存中的bz2。例如decompressed_x = bz2.decompress(x[1]).
.gz 文件吗?即使在转换为数据框后,我也无法将其显示为数据框,因为我有一个文本文件
@user3591785 为我指出了正确的方向,因此我将他的答案标记为正确。
要了解更多细节,我可以搜索 ZipFileInputFormat Hadoop,并看到此链接:http://cotdp.com/2012/07/hadoop-processing-zip-files-in-mapreduce/
使用 ZipFileInputFormat 及其帮助程序 ZipfileRecordReader 类,我能够让 Spark 完美打开并读取 zip 文件。
rdd1 = sc.newAPIHadoopFile("/Users/myname/data/compressed/target_file.ZIP", ZipFileInputFormat.class, Text.class, Text.class, new Job().getConfiguration());
结果是一张只有一个元素的地图。文件名作为键,内容作为值,所以我需要将其转换为 JavaPairRdd。我敢肯定,如果你愿意,你可以用 BytesWritable 替换 Text,然后用其他东西替换 ArrayList,但我的目标是首先让一些东西运行起来。
JavaPairRDD<String, String> rdd2 = rdd1.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, String, String>() {
@Override
public Iterable<Tuple2<String, String>> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
List<Tuple2<String,String>> newList = new ArrayList<Tuple2<String, String>>();
InputStream is = new ByteArrayInputStream(textTextTuple2._2.getBytes());
BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
String line;
while ((line = br.readLine()) != null) {
Tuple2 newTuple = new Tuple2(line.split("\\t")[0],line);
newList.add(newTuple);
}
return newList;
}
});
【讨论】:
请尝试以下代码:
using API sparkContext.newAPIHadoopRDD(
hadoopConf,
InputFormat.class,
ImmutableBytesWritable.class, Result.class)
【讨论】:
我也遇到过类似的问题,我用下面的代码解决了
sparkContext.binaryFiles("/pathToZipFiles/*")
.flatMap { case (zipFilePath, zipContent) =>
val zipInputStream = new ZipInputStream(zipContent.open())
Stream.continually(zipInputStream.getNextEntry)
.takeWhile(_ != null)
.flatMap { zipEntry => ??? }
}
【讨论】:
本回答只收集以前的知识,分享一下我的经验。
我尝试关注@Tinku 和@JeffLL 的答案,并将导入的ZipFileInputFormat 与sc.newAPIHadoopFile API 一起使用。 但这对我不起作用。而且我不知道如何将com-cotdp-hadoop lib 放在我的生产集群上。我不负责设置。
@Tiago Palma 给出了一个很好的建议,但他没有完成他的回答,我挣扎了很长时间才真正得到解压缩的输出。
当我能够这样做时,我必须准备所有理论方面,您可以在我的回答中找到:https://stackoverflow.com/a/45958182/1549135
但提到的答案中缺少的部分是阅读ZipEntry:
import java.util.zip.ZipInputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
sc.binaryFiles(path, minPartitions)
.flatMap { case (name: String, content: PortableDataStream) =>
val zis = new ZipInputStream(content.open)
Stream.continually(zis.getNextEntry)
.takeWhile(_ != null)
.flatMap { _ =>
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}}
【讨论】:
using API sparkContext.newAPIHadoopRDD(hadoopConf, InputFormat.class, ImmutableBytesWritable.class, Result.class)
文件名应该使用conf传递
conf=( new Job().getConfiguration())
conf.set(PROPERTY_NAME from your input formatter,"Zip file address")
sparkContext.newAPIHadoopRDD(conf, ZipFileInputFormat.class, Text.class, Text.class)
请从您的输入格式化程序中找到PROPERTY_NAME 以设置路径
【讨论】:
试试:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.read.text("yourGzFile.gz")
【讨论】: