1.hadoop的压缩codec
Codec为压缩,解压缩的算法实现。 在Hadoop中,codec由CompressionCode的实现来表示。下面是一些实现:
可分割性:可分割与不可分割的区别:文件是否可被切成多个inputsplit。
对于不能切割的文件,如果使用mapreduce算法,需要切割成一个inputsplit,那么这个文件在网络传输的时候必须连着传输
中间一旦传输失败就必须重传,一个inputsplit一般都比较小,对于文件比较大的文件可否分割也是影响性能的重要因素。
压缩比和压缩速度上综合相比LZO好一些,压缩速度是其他的十几倍,但是压缩比相差不大。
mapreduce中那些文件可压缩?
1. 输入文件可能是压缩文件
2.map输出可能是压缩文件
3.reduce输出可以压缩
代码:
完整的测试代码:
1 package Mapreduce; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.io.compress.CompressionCodec; 10 import org.apache.hadoop.io.compress.GzipCodec; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 18 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.MAP; 19 20 /** 21 * 压缩文件实例 22 * 23 * 24 */ 25 public class CompressTest { 26 public static void main(String[] args) throws Exception { 27 //必须要传递的是自定的mapper和reducer的类,输入输出的路径必须指定,输出的类型<k3,v3>必须指定 28 //2将自定义的MyMapper和MyReducer组装在一起 29 Configuration conf=new Configuration(); 30 String jobName=CompressTest.class.getSimpleName(); 31 //1首先寫job,知道需要conf和jobname在去創建即可 32 Job job = Job.getInstance(conf, jobName); 33 34 //*13最后,如果要打包运行改程序,则需要调用如下行 35 job.setJarByClass(CompressTest.class); 36 37 //3读取HDFS內容:FileInputFormat在mapreduce.lib包下 38 FileInputFormat.setInputPaths(job, new Path("hdfs://neusoft-master:9000/data/hellodemo")); 39 //4指定解析<k1,v1>的类(谁来解析键值对) 40 //*指定解析的类可以省略不写,因为设置解析类默认的就是TextInputFormat.class 41 job.setInputFormatClass(TextInputFormat.class); 42 //5指定自定义mapper类 43 job.setMapperClass(MyMapper.class); 44 //6指定map输出的key2的类型和value2的类型 <k2,v2> 45 //*下面两步可以省略,当<k3,v3>和<k2,v2>类型一致的时候,<k2,v2>类型可以不指定 46 job.setMapOutputKeyClass(Text.class); 47 job.setMapOutputValueClass(LongWritable.class); 48 //7分区(默认1个),排序,分组,规约 采用 默认 49 50 //**map端输出进行压缩 51 conf.setBoolean ("mapred.compress.map.output",true); 52 //**reduce端输出进行压缩 53 conf.setBoolean ("mapred.output.compress",true); 54 //**reduce端输出压缩使用的类 55 conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class); 56 //接下来采用reduce步骤 57 //8指定自定义的reduce类 58 job.setReducerClass(MyReducer.class); 59 //9指定输出的<k3,v3>类型 60 job.setOutputKeyClass(Text.class); 61 job.setOutputValueClass(LongWritable.class); 62 //10指定输出<K3,V3>的类 63 //*下面这一步可以省 64 job.setOutputFormatClass(TextOutputFormat.class); 65 //11指定输出路径 66 FileOutputFormat.setOutputPath(job, new Path("hdfs://neusoft-master:9000/out11")); 67 68 //12写的mapreduce程序要交给resource manager运行 69 job.waitForCompletion(true); 70 } 71 private static class MyMapper extends Mapper<LongWritable, Text, Text,LongWritable>{ 72 Text k2 = new Text(); 73 LongWritable v2 = new LongWritable(); 74 @Override 75 protected void map(LongWritable key, Text value,//三个参数 76 Mapper<LongWritable, Text, Text, LongWritable>.Context context) 77 throws IOException, InterruptedException { 78 String line = value.toString(); 79 String[] splited = line.split("\t");//因为split方法属于string字符的方法,首先应该转化为string类型在使用 80 for (String word : splited) { 81 //word表示每一行中每个单词 82 //对K2和V2赋值 83 k2.set(word); 84 v2.set(1L); 85 context.write(k2, v2); 86 } 87 } 88 } 89 private static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { 90 LongWritable v3 = new LongWritable(); 91 @Override //k2表示单词,v2s表示不同单词出现的次数,需要对v2s进行迭代 92 protected void reduce(Text k2, Iterable<LongWritable> v2s, //三个参数 93 Reducer<Text, LongWritable, Text, LongWritable>.Context context) 94 throws IOException, InterruptedException { 95 long sum =0; 96 for (LongWritable v2 : v2s) { 97 //LongWritable本身是hadoop类型,sum是java类型 98 //首先将LongWritable转化为字符串,利用get方法 99 sum+=v2.get(); 100 } 101 v3.set(sum); 102 //将k2,v3写出去 103 context.write(k2, v3); 104 } 105 } 106 }