【问题标题】:How to unzip the files stored in hdfs using spark java如何使用 spark java 解压缩存储在 hdfs 中的文件
【发布时间】:2018-05-22 08:59:54
【问题描述】:
List<String> list= jsc.wholeTextFiles(hdfsPath).keys().collect();
        for (String string : list) {
        System.out.println(string);
        }

我在这里获取所有 zip 文件。从这里我无法继续如何提取每个文件并使用相同的 zipname 文件夹存储到 hdfs 路径中

【问题讨论】:

  • 我建议您可以使用 Java 进行本机编码并进行解压缩。 Spark 可以帮助您使用 wholeTextFiles 读取文件

标签: java hadoop apache-spark hdfs


【解决方案1】:

你可以像下面这样使用,但在将内容写入 hdfs 之前,我们只需要在zipFilesRdd.collect().forEach 收集。地图和平面地图在这一点上提供了不可序列化的任务。

public void readWriteZipContents(String zipLoc,String hdfsBasePath){
    JavaSparkContext jsc = new JavaSparkContext(new SparkContext(new SparkConf()));
    JavaPairRDD<String, PortableDataStream> zipFilesRdd = jsc.binaryFiles(zipLoc);
    zipFilesRdd.collect().forEach(file -> {
        ZipInputStream zipStream = new ZipInputStream(file._2.open());
        ZipEntry zipEntry = null;
        Scanner sc = new Scanner(zipStream);
        try {
            while ((zipEntry = zipStream.getNextEntry()) != null) {
                String entryName = zipEntry.getName();
                if (!zipEntry.isDirectory()) {
                    //create the path in hdfs and write its contents
                   Configuration configuration = new Configuration();
                    configuration.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
                    configuration.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
                    FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:8020"), configuration);
                    FSDataOutputStream hdfsfile = fs.create(new Path(hdfsBasePath + "/" + entryName));
                   while(sc.hasNextLine()){
                       hdfsfile.writeBytes(sc.nextLine());
                   }
                   hdfsfile.close();
                   hdfsfile.flush();
                }
                zipStream.closeEntry();
            }
        } catch (IllegalArgumentException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        sc.close();
        //return fileNames.iterator();
    });
}

【讨论】:

  • java.lang.IllegalArgumentException:错误的 FS:hdfs://localhost:8020/Logs,预期:file:///
  • 你在哪里得到这个异常。?好像你在本地模式下运行火花。给出完整的堆栈跟踪。以及您在哪一行得到异常。
  • 我们在 maven 中使用 spark 库,我正在从本地机器读取 zip 文件并解压缩到 hdfs
  • 好的,如果您从本地读取 zip 文件,那么您需要提供带有“file:///”的路径,正如它在异常中所说的那样。例如,jsc.binaryFiles("file:///localLocation")
  • 本地文件能够读取,但是在将文件保存到 hdfs 时在 FSDataOutputStream hdfsfile = FileSystem.get(jsc.hadoopConfiguration()).create(new Path(hdfsBasePath+"/"+entryName) 处出错);
【解决方案2】:

使用 gzip 文件,wholeTextFiles 应该自动对所有内容进行压缩。 但是,对于 zip 文件,我知道的唯一方法是使用 binaryFiles 并手动解压缩数据。

sc
    .binaryFiles(hdfsDir)
    .mapValues(x=> { 
        var result = scala.collection.mutable.ArrayBuffer.empty[String]
        val zis = new ZipInputStream(x.open())
        var entry : ZipEntry = null
        while({entry = zis.getNextEntry();entry} != null) {
            val scanner = new Scanner(zis)
            while (sc.hasNextLine()) {result+=sc.nextLine()} 
        }
        zis.close()
        result
    }

这为您提供了一个(对)RDD[String, ArrayBuffer[String]],其中键是 hdfs 上的文件名,值是 zip 文件的解压缩内容(ArrayBuffer 的每个元素一行)。如果给定的 zip 文件包含多个文件,则所有内容都会被聚合。您可以调整代码以满足您的确切需求。例如,flatMapValues 而不是 mapValues 将展平所有内容 (RDD[String, String]) 以利用 spark 的并行性。

还要注意,在 while 条件下,“{entry = is.getNextEntry();entry} 可以在 java 中替换为 (entry = is.getNextEntry())。但是在 scala 中,做作的结果是 Unit 所以这将产生一个无限循环。

【讨论】:

    【解决方案3】:

    想出这个用 Scala 编写的解决方案。

    使用 spark2(版本 2.3.0.cloudera2)、scala(版本 2.11.8)测试

    def extractHdfsZipFile(source_zip : String, target_folder : String,
        sparksession : SparkSession) : Boolean = {
    
        val hdfs_config = sparksession.sparkContext.hadoopConfiguration
        val buffer = new Array[Byte](1024)
    
        /*
         .collect -> run on driver only, not able to serialize hdfs Configuration
        */
        val zip_files = sparksession.sparkContext.binaryFiles(source_zip).collect.
          foreach{ zip_file: (String, PortableDataStream) =>
            // iterate over zip_files
            val zip_stream : ZipInputStream = new ZipInputStream(zip_file._2.open)
            var zip_entry: ZipEntry = null
    
            try {
              // iterate over all ZipEntry from ZipInputStream
              while ({zip_entry = zip_stream.getNextEntry; zip_entry != null}) {
                // skip directory
                if (!zip_entry.isDirectory()) {
                  println(s"Extract File: ${zip_entry.getName()}, with Size: ${zip_entry.getSize()}")
                  // create new hdfs file
                  val fs : FileSystem = FileSystem.get(hdfs_config)
                  val hdfs_file : FSDataOutputStream = fs.create(new Path(target_folder + "/" + zip_entry.getName()))
    
                  var len : Int = 0
                  // write until zip_stream is null
                  while({len = zip_stream.read(buffer); len > 0}) {
                    hdfs_file.write(buffer, 0, len)
                  }
                  // close and flush hdfs_file
                  hdfs_file.close()
                  hdfs_file.flush()
                }
                zip_stream.closeEntry()
              }
              zip_stream.close()
            } catch {
              case zip : ZipException => {
                println(zip.printStackTrace)
                println("Please verify that you do not use compresstype9.")
                // for DEBUG throw exception
                //false
                throw zip
              }
              case e : Exception => {
                println(e.printStackTrace)
                // for DEBUG throw exception
                //false
                throw e
              }
            }
        }
        true
      }
    

    【讨论】:

      猜你喜欢
      • 2017-06-17
      • 2017-09-16
      • 1970-01-01
      • 1970-01-01
      • 2021-10-02
      • 2022-07-27
      • 1970-01-01
      • 1970-01-01
      • 2011-10-30
      相关资源
      最近更新 更多