1.hadoop的压缩codec

    Codec为压缩,解压缩的算法实现。 在Hadoop中,codec由CompressionCode的实现来表示。下面是一些实现:

  Mapreduce其他部分

  Mapreduce其他部分

可分割性:可分割与不可分割的区别:文件是否可被切成多个inputsplit。

       对于不能切割的文件,如果使用mapreduce算法,需要切割成一个inputsplit,那么这个文件在网络传输的时候必须连着传输

中间一旦传输失败就必须重传,一个inputsplit一般都比较小,对于文件比较大的文件可否分割也是影响性能的重要因素。

        压缩比和压缩速度上综合相比LZO好一些,压缩速度是其他的十几倍,但是压缩比相差不大。

         mapreduce中那些文件可压缩?

         1. 输入文件可能是压缩文件

         2.map输出可能是压缩文件

         3.reduce输出可以压缩

 代码:

         Mapreduce其他部分  

 完整的测试代码:   

  1 package Mapreduce;
  2 
  3 import java.io.IOException;
  4 
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.fs.Path;
  7 import org.apache.hadoop.io.LongWritable;
  8 import org.apache.hadoop.io.Text;
  9 import org.apache.hadoop.io.compress.CompressionCodec;
 10 import org.apache.hadoop.io.compress.GzipCodec;
 11 import org.apache.hadoop.mapreduce.Job;
 12 import org.apache.hadoop.mapreduce.Mapper;
 13 import org.apache.hadoop.mapreduce.Reducer;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 18 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.MAP;
 19 
 20 /**
 21  * 压缩文件实例
 22  *
 23  * 
 24  */
 25 public class CompressTest {
 26     public static void main(String[] args) throws Exception {
 27         //必须要传递的是自定的mapper和reducer的类,输入输出的路径必须指定,输出的类型<k3,v3>必须指定
 28         //2将自定义的MyMapper和MyReducer组装在一起
 29         Configuration conf=new Configuration();
 30         String jobName=CompressTest.class.getSimpleName();
 31         //1首先寫job,知道需要conf和jobname在去創建即可
 32         Job job = Job.getInstance(conf, jobName);
 33         
 34         //*13最后,如果要打包运行改程序,则需要调用如下行
 35         job.setJarByClass(CompressTest.class);
 36         
 37         //3读取HDFS內容:FileInputFormat在mapreduce.lib包下
 38         FileInputFormat.setInputPaths(job, new Path("hdfs://neusoft-master:9000/data/hellodemo"));
 39         //4指定解析<k1,v1>的类(谁来解析键值对)
 40         //*指定解析的类可以省略不写,因为设置解析类默认的就是TextInputFormat.class
 41         job.setInputFormatClass(TextInputFormat.class);
 42         //5指定自定义mapper类
 43         job.setMapperClass(MyMapper.class);
 44         //6指定map输出的key2的类型和value2的类型  <k2,v2>
 45         //*下面两步可以省略,当<k3,v3>和<k2,v2>类型一致的时候,<k2,v2>类型可以不指定
 46         job.setMapOutputKeyClass(Text.class);
 47         job.setMapOutputValueClass(LongWritable.class);
 48         //7分区(默认1个),排序,分组,规约 采用 默认
 49         
 50         //**map端输出进行压缩
 51         conf.setBoolean    ("mapred.compress.map.output",true);
 52         //**reduce端输出进行压缩
 53         conf.setBoolean    ("mapred.output.compress",true);
 54         //**reduce端输出压缩使用的类
 55         conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
 56         //接下来采用reduce步骤
 57         //8指定自定义的reduce类
 58         job.setReducerClass(MyReducer.class);
 59         //9指定输出的<k3,v3>类型
 60         job.setOutputKeyClass(Text.class);
 61         job.setOutputValueClass(LongWritable.class);
 62         //10指定输出<K3,V3>的类
 63         //*下面这一步可以省
 64         job.setOutputFormatClass(TextOutputFormat.class);
 65         //11指定输出路径
 66         FileOutputFormat.setOutputPath(job, new Path("hdfs://neusoft-master:9000/out11"));
 67         
 68         //12写的mapreduce程序要交给resource manager运行
 69         job.waitForCompletion(true);
 70     }
 71     private static class MyMapper extends Mapper<LongWritable, Text, Text,LongWritable>{
 72         Text k2 = new Text();
 73         LongWritable v2 = new LongWritable();
 74         @Override
 75         protected void map(LongWritable key, Text value,//三个参数
 76                 Mapper<LongWritable, Text, Text, LongWritable>.Context context) 
 77                 throws IOException, InterruptedException {
 78             String line = value.toString();
 79             String[] splited = line.split("\t");//因为split方法属于string字符的方法,首先应该转化为string类型在使用
 80             for (String word : splited) {
 81                 //word表示每一行中每个单词
 82                 //对K2和V2赋值
 83                 k2.set(word);
 84                 v2.set(1L);
 85                 context.write(k2, v2);
 86             }
 87         }
 88     }
 89     private static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
 90         LongWritable v3 = new LongWritable();
 91         @Override //k2表示单词,v2s表示不同单词出现的次数,需要对v2s进行迭代
 92         protected void reduce(Text k2, Iterable<LongWritable> v2s,  //三个参数
 93                 Reducer<Text, LongWritable, Text, LongWritable>.Context context)
 94                 throws IOException, InterruptedException {
 95             long sum =0;
 96             for (LongWritable v2 : v2s) {
 97                 //LongWritable本身是hadoop类型,sum是java类型
 98                 //首先将LongWritable转化为字符串,利用get方法
 99                 sum+=v2.get();
100             }
101             v3.set(sum);
102             //将k2,v3写出去
103             context.write(k2, v3);
104         }
105     }
106 }
压缩文件实例

相关文章:

  • 2021-10-15
  • 2021-07-01
  • 2022-12-23
  • 2021-07-17
猜你喜欢
  • 2021-11-03
  • 2022-02-08
  • 2022-12-23
  • 2021-10-20
  • 2021-05-21
  • 2021-08-04
  • 2021-11-27
相关资源
相似解决方案