【问题标题】:Spark Standalone Mode: How to compress spark output written to HDFSSpark Standalone Mode:如何压缩写入 HDFS 的 spark 输出
【发布时间】:2013-06-18 22:27:31
【问题描述】:

与我的其他问题有关,但不同:

someMap.saveAsTextFile("hdfs://HOST:PORT/out")

如果我将 RDD 保存到 HDFS,我如何告诉 spark 使用 gzip 压缩输出? 在 Hadoop 中,可以设置

mapred.output.compress = true

并选择压缩算法

mapred.output.compression.codec = <<classname of compression codec>>

我将如何在 spark 中执行此操作?这也可以吗?

编辑:使用 spark-0.7.2

【问题讨论】:

    标签: scala compression hdfs apache-spark


    【解决方案1】:

    方法saveAsTextFile 需要使用编解码器类的附加可选参数。因此,对于您的示例,使用 gzip 应该是这样的:

    someMap.saveAsTextFile("hdfs://HOST:PORT/out", classOf[GzipCodec])
    

    更新

    由于您使用的是 0.7.2,因此您可以通过在启动时设置的配置选项来移植压缩代码。我不确定这是否能正常工作,但你需要从这个开始:

    conf.setCompressMapOutput(true)
    conf.set("mapred.output.compress", "true")
    conf.setMapOutputCompressorClass(c)
    conf.set("mapred.output.compression.codec", c.getCanonicalName)
    conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
    

    到这样的事情:

    System.setProperty("spark.hadoop.mapred.output.compress", "true")
    System.setProperty("spark.hadoop.mapred.output.compression.codec", "true")
    System.setProperty("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
    System.setProperty("spark.hadoop.mapred.output.compression.type", "BLOCK")
    

    如果你让它工作,发布你的配置可能也会对其他人有所帮助。

    【讨论】:

    • 这适用于哪个版本的 spark?我正在使用 spark-0.7.2,但在编译时出现错误:error: too many arguments for method saveAsTextFile。我看到这是discussed
    • 我看到它在最新的 spark-0.8.0 中。由于这是一个相当重要的功能,因此必须将其拉出。
    • 啊,有道理。我一直在使用 master 分支,而不是 0.7.2。
    • 我已经测试了您的第二个 sn-p (System.setProperty(...) [...]),它立即适用于 0.7.2。谢谢:)
    • @noah 你设置了两次spark.hadoop.mapred.output.compression.codec,这是多余的,除非我遗漏了什么?
    【解决方案2】:

    另一种将 gzip 文件保存到 HDFS 或 Amazon S3 目录系统的方法是使用 saveAsHadoopFile 方法。

    someMap 是 RDD[(K,V)],如果你有 someMap 作为 RDD[V],你可以调用 someMap.map(line=>(line, "") 来使用 saveAsHadoopFile 方法。

    import org.apache.hadoop.io.compress.GzipCodec
    
    someMap.saveAsHadoopFile(output_folder_path, classOf[String], classOf[String], classOf[MultipleTextOutputFormat[String, String]], classOf[GzipCodec])
    

    【讨论】:

    • 是否可以在spark-defaults.xml 中以类似的方式设置这些参数,以便每个作业都可以使用它?我尝试将设置复制到spark-defaults.xml,但设置似乎没有被拾取。
    【解决方案3】:

    对于较新的 Spark 版本,请在您的 spark-defaults.xml 文件中执行以下操作。 (mapred 已弃用)。

    <property>
        <name>mapreduce.output.fileoutputformat.compress</name>
        <value>true</value>
    </property>
    <property>
        <name>mapreduce.output.fileoutputformat.compress.codec</name>
        <value>GzipCodec</value>
    </property>
    <property>
        <name>mapreduce.output.fileoutputformat.compress.type</name>
        <value>BLOCK</value>
    </property>
    

    【讨论】:

      【解决方案4】:

      这是对所有大多数版本的 spark 进行快速压缩的最简单/最短的方法。

      import org.apache.hadoop.io.SequenceFile.CompressionType
      
       /**
         * Set compression configurations to Hadoop `Configuration`.
         * `codec` should be a full class path
         */
        def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
          if (codec != null) {
            conf.set("mapreduce.output.fileoutputformat.compress", "true")
            conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) // "BLOCK" as string
            conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
            conf.set("mapreduce.map.output.compress", "true")
            conf.set("mapreduce.map.output.compress.codec", codec)
          } else {
            // This infers the option `compression` is set to `uncompressed` or `none`.
            conf.set("mapreduce.output.fileoutputformat.compress", "false")
            conf.set("mapreduce.map.output.compress", "false")
          }
        }
      

      其中confspark.sparkContext.hadoopConfiguration

      codec上述方法中的字符串参数选项为

       1.none 
       2.uncompressed 
       3.bzip2 
       4.deflate 
       5.gzip 
       6.lz4 
       7.snappy
      

      【讨论】:

        猜你喜欢
        • 2013-06-18
        • 1970-01-01
        • 2020-09-15
        • 2018-06-02
        • 2017-06-17
        • 1970-01-01
        • 1970-01-01
        • 2019-05-30
        • 1970-01-01
        相关资源
        最近更新 更多