一、MapReduce编程案例:求部门的工资总额

1、员工表
SQL:select deptno,sum(sal) from emp group by deptno;
2、分析数据处理的过程
3、开发程序
4、求每个部门的平均工资
BigData-07:MapReduce基础
emp员工表:

员工编号 员工姓名 员工职位 员工老板编号 员工入职日期 月薪资 奖金 部门编号
7369 SMITH CLERK 7902 1980/12/17 800 0 20
7499 ALLEN SALESMAN 7698 1981/2/20 1600 300 30
7521 WARD SALESMAN 7698 1981/2/22 1250 500 30
7566 JONES MANAGER 7839 1981/4/2 2975 0 20
7654 MARTIN SALESMAN 7698 1981/9/28 1250 1400 30
7698 BLAKE MANAGER 7839 1981/5/1 2850 0 30
7782 CLARK MANAGER 7839 1981/6/9 2450 0 10
7788 SCOTT ANALYST 7566 1987/4/19 3000 0 20
7839 KING PRESIDENT -1 1981/11/17 5000 0 10
7844 TURNER SALESMAN 7698 1981/9/8 1500 0 30
7876 ADAMS CLERK 7788 1987/5/23 1100 0 20
7900 JAMES CLERK 7698 1981/12/3 950 0 30
7902 FORD ANALYST 7566 1981/12/3 3000 0 20
7934 MILLER CLERK 7782 1982/1/23 1300 0 10

dept部门表:

部门编号 部门名称 部门所在地
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON

代码实现:

//                                              k1          v1      k2          v2
public class SalaryTotalMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 数据:7369,SMITH,CLERK,7902,1980/12/17,800,0,20
        String line = value.toString();

        // 切分
        String[] words = line.split(",");

        // 输出          k2:部门号      v2:员工工资
        context.write(new IntWritable(Integer.parseInt(words[7])),
                new IntWritable(Integer.parseInt(words[5])));
    }
}
//                                                 k3        v3          k4          v4
public class SalaryTotalReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {
    @Override
    protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        // 对v3求和
        int total = 0;
        for (IntWritable value : values) {
            total += value.get();
        }

        // 输出k4部门号  v4部门的工资总额
        context.write(key, new IntWritable(total));
    }
}
public class SalaryTotalMain {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.创建任务
        Job job = Job.getInstance(new Configuration());
        // 程序的入口
        job.setJarByClass(SalaryTotalMain.class);

        // 2. 指定map类和map的输出类型
        job.setMapperClass(SalaryTotalMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 3. 指定reduce类和reduce输出类型(就是最终结果的输出类型)
        job.setReducerClass(SalaryTotalReducer.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);

        // 4. 指定输入路径,任务结果输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 5. 执行任务
        job.waitForCompletion(true);
    }
}

二、MapReduce的高级特性

1、序列化
	(*) 复习:Java的序列化
	(*) MapReduce的序列化:核心接口:Writable
	                       如果一个类实现了Writable接口,该类的对象可以作为key和value
			举例1:读取员工数据,生成员工的对象,直接输出到HDFS
			举例2:使用MapReduce序列化重写“求部门工资的总额的例子”
			
2、排序
	(*)规则:按照Key2排序
	(*)基本数据类型
			(1)数字:   默认:升序
			              可以改变默认的排序规则(创建自己的比较器即可)
						  
			(2)字符串: 默认:字典顺序
			              可以改变默认的排序规则(创建自己的比较器即可)
	
	(*)对象

3、分区
4、合并

三、MapReduce核心:Shuffle(洗牌)

相关文章: