不多说,直接上代码。

  假如这里有一份邮箱数据文件,我们期望统计邮箱出现次数并按照邮箱的类别,将这些邮箱分别输出到不同文件路径下。

 Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

代码版本1

 

  1 package zhouls.bigdata.myMapReduce.Email;
  2 
  3 import java.io.IOException;
  4 import org.apache.hadoop.conf.Configuration;
  5 import org.apache.hadoop.conf.Configured;
  6 import org.apache.hadoop.fs.FileSystem;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.IntWritable;
  9 import org.apache.hadoop.io.LongWritable;
 10 import org.apache.hadoop.io.Text;
 11 import org.apache.hadoop.mapreduce.Job;
 12 import org.apache.hadoop.mapreduce.Mapper;
 13 import org.apache.hadoop.mapreduce.Reducer;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 16 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
 17 import org.apache.hadoop.util.Tool;
 18 import org.apache.hadoop.util.ToolRunner;
 19 
 20 //通过MultipleOutputs写到多个文件:参考博客http://www.cnblogs.com/codeOfLife/p/5452902.html
 21 
 22 //    MultipleOutputs 类可以将数据写到多个文件,这些文件的名称源于输出的键和值或者任意字符串。
 23 //  这允许每个 reducer(或者只有 map 作业的 mapper)创建多个文件。 采用name-m-nnnnn 形式的文件名用于 map 输出,name-r-nnnnn 形式的文件名用于 reduce 输出,
 24 //  其中 name 是由程序设定的任意名字, nnnnn 是一个指明块号的整数(从 0 开始)。块号保证从不同块(mapper 或 reducer)输出在相同名字情况下不会冲突。
 25 
 26 
 27 
 28 public class Email extends Configured implements Tool {
 29     public static class MailMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
 30         private final static IntWritable one = new IntWritable(1);
 31 
 32         @Override
 33         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 34             context.write(value, one);
 35         }
 36     }
 37 
 38     
 39     public static class MailReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
 40         private IntWritable result = new IntWritable();
 41         private MultipleOutputs< Text, IntWritable> multipleOutputs;
 42 
 43         @Override
 44         protected void setup(Context context) throws IOException ,InterruptedException{
 45             multipleOutputs = new MultipleOutputs<Text, IntWritable>(context);
 46         }
 47         
 48         protected void reduce(Text Key, Iterable<IntWritable> Values,Context context) throws IOException, InterruptedException {
 49             int begin = Key.toString().indexOf("@");//indexOf方法返回一个整数值,指出 String 对象内子字符串的开始位置。
 50             int end = Key.toString().indexOf(".");//indexOf方法返回一个整数值,指出 String 对象内子字符串的开始位置。只不过我们自己写出个end变量而已
 51 //            Key.toString().indexOf(ch)
 52 //            Key.toString().indexOf(str)
 53 //            Key.toString().indexOf(ch, fromIndex)
 54 //            Key.toString().indexOf(str, fromIndex)
 55 //            Key.toString().intern()
 56             
 57 //            Java中字符串中子串的查找共有四种方法,如下:
 58 //            1、int indexOf(String str) :返回第一次出现的指定子字符串在此字符串中的索引。 
 59 //            2、int indexOf(String str, int startIndex):从指定的索引处开始,返回第一次出现的指定子字符串在此字符串中的索引。 
 60 //            3、int lastIndexOf(String str) :返回在此字符串中最右边出现的指定子字符串的索引。 
 61 //            4、int lastIndexOf(String str, int startIndex) :从指定的索引处开始向后搜索,返回在此字符串中最后一次出现的指定子字符串的索引。
 62             
 63             
 64             if(begin>=end){
 65                 return;
 66             }
 67             
 68             //获取邮箱类别,比如 qq
 69             String name = Key.toString().substring(begin+1, end);
 70 //                        String.subString(start,end)截取的字符串包括起点所在的字符串,不包括终点所在的字符串
 71             
 72             int sum = 0;
 73             
 74             for (IntWritable value : Values) {
 75                 sum += value.get();
 76             }
 77             
 78             result.set(sum);
 79             multipleOutputs.write(Key, result, name);
 80                         //这里,我们用到的是multipleOutputs.write(Text key, IntWritable value, String baseOutputPath);
 81             
 82 //            multipleOutputs.write默认有3种构造方法:
 83 //            multipleOutputs.write(String namedOutput, K key, V value);
 84 //            multipleOutputs.write(Text key, IntWritable value, String baseOutputPath);
 85 //            multipleOutputs.write(String namedOutput, K key, V value,String  baseOutputPath);
 86               
 87 //            MultipleOutputs 类可以将数据写到多个文件,这些文件的名称源于输出的键和值或者任意字符串。
 88 //            这允许每个 reducer(或者只有 map 作业的 mapper)创建多个文件。
 89 //             采用name-m-nnnnn 形式的文件名用于 map 输出,name-r-nnnnn 形式的文件名用于 reduce 输出,
 90 //             其中 name 是由程序设定的任意名字,
 91 //            nnnnn 是一个指明块号的整数(从 0 开始)。
 92 //             块号保证从不同块(mapper 或 reducer)写的输出在相同名字情况下不会冲突。
 93             
 94         }
 95         
 96         @Override
 97         protected void cleanup(Context context) throws IOException ,InterruptedException{
 98             multipleOutputs.close();
 99         }
100         
101     }
102 
103     public int run(String[] args) throws Exception {
104         Configuration conf = new Configuration();// 读取配置文件
105         
106         Path mypath = new Path(args[1]);
107         FileSystem hdfs = mypath.getFileSystem(conf);//创建输出路径
108         if (hdfs.isDirectory(mypath)) {
109             hdfs.delete(mypath, true);
110         }
111         Job job = Job.getInstance();// 新建一个任务
112         job.setJarByClass(Email.class);// 主类
113         
114         FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
115         FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径
116 
117         job.setMapperClass(MailMapper.class);// Mapper
118         job.setReducerClass(MailReducer.class);// Reducer
119         
120         job.setOutputKeyClass(Text.class);// key输出类型
121         job.setOutputValueClass(IntWritable.class);// value输出类型
122         
123         job.waitForCompletion(true);
124         return 0;
125     }
126 
127     public static void main(String[] args) throws Exception {
128         String[] args0 = {
129                 "hdfs://HadoopMaster:9000/inputData/multipleOutputFormats/mail.txt",
130                 "hdfs://HadoopMaster:9000/outData/MultipleOutputFormats/" };
131         int ec = ToolRunner.run(new Configuration(), new Email(), args0);
132         System.exit(ec);
133     }
134 }

 

 

 

 

 

 

 

 

 

代码版本1

  1 package zhouls.bigdata.myMapReduce.Email;
  2 
  3 import java.io.IOException;
  4 import org.apache.hadoop.conf.Configuration;
  5 import org.apache.hadoop.conf.Configured;
  6 import org.apache.hadoop.fs.FileSystem;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.IntWritable;
  9 import org.apache.hadoop.io.LongWritable;
 10 import org.apache.hadoop.io.Text;
 11 import org.apache.hadoop.mapreduce.Job;
 12 import org.apache.hadoop.mapreduce.Mapper;
 13 import org.apache.hadoop.mapreduce.Reducer;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 16 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
 17 import org.apache.hadoop.util.Tool;
 18 import org.apache.hadoop.util.ToolRunner;
 19 
 20 
 21 //假如这里有一份邮箱数据文件,我们期望统计邮箱出现次数并按照邮箱的类别,将这些邮箱分别输出到不同文件路径下。
 22 /*wolys@21cn.com
 23 zss1984@126.com
 24 294522652@qq.com
 25 simulateboy@163.com
 26 zhoushigang_123@163.com
 27 sirenxing424@126.com
 28 lixinyu23@qq.com
 29 chenlei1201@gmail.com
 30 370433835@qq.com
 31 cxx0409@126.com
 32 viv093@sina.com
 33 q62148830@163.com
 34 65993266@qq.com
 35 summeredison@sohu.com
 36 zhangbao-autumn@163.com
 37 diduo_007@yahoo.com.cn
 38 fxh852@163.com
 39 
 40 
 41 /out/163-r-00000
 42 /out/126-r-00000
 43 /out/21cn-r-00000
 44 /out/gmail-r-00000
 45 /out/qq-r-00000
 46 /out/sina-r-00000
 47 /out/sohu-r-00000
 48 /out/yahoo-r-00000
 49 /out/part-r-00000
 50 */
 51 
 52 
 53 public class Email extends Configured implements Tool{
 54     public static class MailMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
 55         private final static IntWritable one = new IntWritable(1);//赋值1给one
 56 
 57         @Override
 58         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 59             context.write(value, one);//将value和one写入到context里。    value是k2,one是v2
 60 //            context.write(new Text(value),new IntWritable(one));等价        
 61 //            key默认是行偏移量,可以自己自定义改
 62             
 63         }
 64     }
 65 
 66     
 67     
 68 //    MultipleOutputs将结果输出到多个文件或文件夹的步骤:
 69 //    见博客http://tydldd.iteye.com/blog/2053867
 70     
 71     
 72     public static class MailReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
 73         private IntWritable result = new IntWritable();
 74         private MultipleOutputs<Text, IntWritable> multipleOutputs;//MultipleOutputs将结果输出到多个文件或文件夹
 75 //        因为,MultipleOutputs是将结果输出到多个文件或文件夹,那么结果是什么,则就是k3,v3啦。即在这里就是MultipleOutputs<Text, IntWritable> multipleOutputs;
 76         
 77         
 78         //创建对象,以下是模板,别怕
 79         protected void setup(Context context) throws IOException ,InterruptedException{
 80             multipleOutputs = new MultipleOutputs<Text, IntWritable>(context);
 81         }
 82         
 83         protected void reduce(Text Key, Iterable<IntWritable> Values,Context context) throws IOException, InterruptedException{ 
 84         //294522652@qq.com
 85             int begin = Key.toString().indexOf("@");//indexOf() 方法可返回某个指定的字符串值在字符串中首次出现的位置。 即begin是9
 86             int end = Key.toString().indexOf(".");//indexOf() 方法可返回某个指定的字符串值在字符串中首次出现的位置。 即end是12
 87             if(begin>=end){
 88                 return;
 89             }
 90             
 91             //获取邮箱类别,比如 qq
 92             String name = Key.toString().substring(begin+1, end);//substring()是去除指定字符串的方法,及substring(10,12)
 93             int sum = 0;
 94             for (IntWritable value : Values) {//计数,for星型循环,即将Iterable<IntWritable> Values的值,一一传给IntWritable value
 95                 sum += value.get();//就是拿取IntWritable类型的value的值,给value类型的sum
 96             }
 97             result.set(sum);//即求和计数,如wolys@21cn.com出现了几次几次。
 98             multipleOutputs.write(Key, result, name);//将Key和result和name一起写入multipleOutputs
 99             
100              /*
101               * http://www.cnblogs.com/codeOfLife/p/5452902.html
102              * multipleOutputs.write(key, value, baseOutputPath)方法的第三个函数表明了该输出所在的目录(相对于用户指定的输出目录)。
103              * 如果baseOutputPath不包含文件分隔符"/",那么输出的文件格式为baseOutputPath-r-nnnnn(name-r-nnnnn);
104              * 如果包含文件分隔符"/",例如baseOutputPath="029070-99999/1901/part",那么输出文件则为029070-99999/1901/part-r-nnnnn
105              */
106         }
107         
108         //关闭对象,以下是模板,别怕
109         protected void cleanup(Context context) throws IOException ,InterruptedException{
110             multipleOutputs.close();
111         }
112     }
113 
114 
115     public int run(String[] arg0) throws Exception{
116         Configuration conf = new Configuration();// 读取配置文件
117         Path mypath = new Path(arg0[1]);//下标为1,即是输出路径
118         FileSystem hdfs = mypath.getFileSystem(conf);//FileSystem对象hdfs
119         if (hdfs.isDirectory(mypath))
120          {
121             hdfs.delete(mypath, true);
122         }
123         Job job = Job.getInstance();// 新建一个任务
124         job.setJarByClass(Email.class);// 主类
125         
126 
127 
128         job.setMapperClass(MailMapper.class);// Mapper
129         job.setReducerClass(MailReducer.class);// Reducer
130         
131         job.setOutputKeyClass(Text.class);// key输出类型
132         job.setOutputValueClass(IntWritable.class);// value输出类型
133         
134         FileInputFormat.addInputPath(job, new Path(arg0[0]));// 文件输入路径
135         FileOutputFormat.setOutputPath(job, new Path(arg0[1]));// 文件输出路径
136         job.waitForCompletion(true);
137         
138         return 0;
139     }
140 
141     
142     
143     public static void main(String[] args) throws Exception{
144         //集群路径            
145 //        String[] args0 = { "hdfs://HadoopMaster:9000/email/email.txt",
146 //                 "hdfs://HadoopMaster:9000/out/email"};
147         
148 //本地路径            
149         String[] args0 = { "./data/email/email.txt",
150                  "out/email/"};            
151         
152         int ec = ToolRunner.run( new Configuration(), new Email(), args0);
153         System. exit(ec);
154     }
155 }

 

相关文章: