【问题标题】:How to open/stream .zip files through Spark?如何通过 Spark 打开/流式传输 .zip 文件?
【发布时间】:2015-04-18 15:18:47
【问题描述】:

我有想要“通过”Spark 打开的 zip 文件。由于 Hadoop 原生 Codec 支持,我可以打开 .gzip 文件,但无法使用 .zip 文件。

是否有一种简单的方法可以读取 Spark 代码中的 zip 文件?我还搜索了要添加到 CompressionCodecFactory 的 zip 编解码器实现,但到目前为止没有成功。

【问题讨论】:

    标签: hadoop apache-spark


    【解决方案1】:

    python 代码没有解决方案,我最近不得不阅读 pyspark 中的 zip。而且,在搜索如何做到这一点时,我遇到了这个问题。所以,希望这对其他人有所帮助。

    import zipfile
    import io
    
    def zip_extract(x):
        in_memory_data = io.BytesIO(x[1])
        file_obj = zipfile.ZipFile(in_memory_data, "r")
        files = [i for i in file_obj.namelist()]
        return dict(zip(files, [file_obj.open(file).read() for file in files]))
    
    
    zips = sc.binaryFiles("hdfs:/Testing/*.zip")
    files_data = zips.map(zip_extract).collect()
    

    在上面的代码中,我返回了一个字典,其中 zip 中的文件名作为键,每个文件中的文本数据作为值。您可以根据自己的目的进行更改。

    【讨论】:

    • 这适用于我不太大的 zip 文件。另一个有趣的部分是,一旦你有了解压后的 zip 文件的二进制文件,就没有简单的方法可以将其转换为 hdfs 或 s3。我要做的是使用 python 将它写入本地文件,然后从那里取出并将其移动到 s3。
    • 有没有办法可以为bz2 文件应用相同的逻辑。我试图无法将 rdd 转换为 BytesIO
    • @GaurangShah,有。可以使用bz2.decompress解压内存中的bz2。例如decompressed_x = bz2.decompress(x[1]).
    • 这也适用于.gz 文件吗?即使在转换为数据框后,我也无法将其显示为数据框,因为我有一个文本文件
    【解决方案2】:

    @user3591785 为我指出了正确的方向,因此我将他的答案标记为正确。

    要了解更多细节,我可以搜索 ZipFileInputFormat Hadoop,并看到此链接:http://cotdp.com/2012/07/hadoop-processing-zip-files-in-mapreduce/

    使用 ZipFileInputFormat 及其帮助程序 ZipfileRecordReader 类,我能够让 Spark 完美打开并读取 zip 文件。

        rdd1  = sc.newAPIHadoopFile("/Users/myname/data/compressed/target_file.ZIP", ZipFileInputFormat.class, Text.class, Text.class, new Job().getConfiguration());
    

    结果是一张只有一个元素的地图。文件名作为键,内容作为值,所以我需要将其转换为 JavaPairRdd。我敢肯定,如果你愿意,你可以用 BytesWritable 替换 Text,然后用其他东西替换 ArrayList,但我的目标是首先让一些东西运行起来。

    JavaPairRDD<String, String> rdd2 = rdd1.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, String, String>() {
    
        @Override
        public Iterable<Tuple2<String, String>> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
            List<Tuple2<String,String>> newList = new ArrayList<Tuple2<String, String>>();
    
            InputStream is = new ByteArrayInputStream(textTextTuple2._2.getBytes());
            BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
    
            String line;
    
            while ((line = br.readLine()) != null) {
    
            Tuple2 newTuple = new Tuple2(line.split("\\t")[0],line);
                newList.add(newTuple);
            }
            return newList;
        }
    });
    

    【讨论】:

      【解决方案3】:

      请尝试以下代码:

      using API sparkContext.newAPIHadoopRDD(
          hadoopConf,
          InputFormat.class,
          ImmutableBytesWritable.class, Result.class)
      

      【讨论】:

      • 谢谢,但可以提供一个示例用例吗?
      【解决方案4】:

      我也遇到过类似的问题,我用下面的代码解决了

      sparkContext.binaryFiles("/pathToZipFiles/*")
      .flatMap { case (zipFilePath, zipContent) =>
      
              val zipInputStream = new ZipInputStream(zipContent.open())
      
              Stream.continually(zipInputStream.getNextEntry)
              .takeWhile(_ != null)
              .flatMap { zipEntry => ??? }
          }
      

      【讨论】:

      【解决方案5】:

      本回答只收集以前的知识,分享一下我的经验。

      ZipFileInputFormat

      我尝试关注@Tinku@JeffLL 的答案,并将导入的ZipFileInputFormatsc.newAPIHadoopFile API 一起使用。 但这对我不起作用。而且我不知道如何将com-cotdp-hadoop lib 放在我的生产集群上。我不负责设置。

      ZipInputStream

      @Tiago Palma 给出了一个很好的建议,但他没有完成他的回答,我挣扎了很长时间才真正得到解压缩的输出。

      当我能够这样做时,我必须准备所有理论方面,您可以在我的回答中找到:https://stackoverflow.com/a/45958182/1549135

      但提到的答案中缺少的部分是阅读ZipEntry

      import java.util.zip.ZipInputStream;
      import java.io.BufferedReader;
      import java.io.InputStreamReader;   
      
      sc.binaryFiles(path, minPartitions)
            .flatMap { case (name: String, content: PortableDataStream) =>
              val zis = new ZipInputStream(content.open)
              Stream.continually(zis.getNextEntry)
                    .takeWhile(_ != null)
                    .flatMap { _ =>
                        val br = new BufferedReader(new InputStreamReader(zis))
                        Stream.continually(br.readLine()).takeWhile(_ != null)
                    }}
       
      

      【讨论】:

      • 能否解压大文件。这就像 10 到 12gigs 的 Zip 文件。我得到内存异常
      • 我们可以为 7z 文件做吗
      【解决方案6】:
      using API sparkContext.newAPIHadoopRDD(hadoopConf, InputFormat.class, ImmutableBytesWritable.class, Result.class) 
      

      文件名应该使用conf传递

      conf=( new Job().getConfiguration())
      conf.set(PROPERTY_NAME from your input formatter,"Zip file address")
      sparkContext.newAPIHadoopRDD(conf, ZipFileInputFormat.class, Text.class, Text.class)
      

      请从您的输入格式化程序中找到PROPERTY_NAME 以设置路径

      【讨论】:

      • 使用上面的代码,我能够成功执行它直到 56MB,但是对于大小为 338MB 的文件却失败了,我最终得到了异常 java.lang.OutOfMemoryError: Java heap space at java .util.Arrays.copyOf(Arrays.java:3236) 在 java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) 在 java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) 在 java.io.ByteArrayOutputStream。在 hydrograph.engine.spark.zipread.ZipFileRecordReader.nextKeyValue(ZipFileRecordReader.java:105) 处写入(ByteArrayOutputStream.java:153)......
      【解决方案7】:

      试试:

      from pyspark.sql import SparkSession
      spark = SparkSession.builder.getOrCreate()
      spark.read.text("yourGzFile.gz")
      

      【讨论】:

      • 虽然此代码可能会回答问题,但提供有关它如何和/或为什么解决问题的额外上下文将提高​​答案的长期价值。
      猜你喜欢
      • 2010-10-12
      • 1970-01-01
      • 2016-01-28
      • 1970-01-01
      • 2013-03-18
      • 2017-07-23
      • 2021-03-15
      • 1970-01-01
      • 2018-05-19
      相关资源
      最近更新 更多