【问题标题】:How to write MapReduce code for such scenario?如何为这种场景编写 MapReduce 代码?
【发布时间】:2015-08-25 13:18:26
【问题描述】:

假设我有如下输入文件

dept_id emp_id  salary
1       13611   1234
2       13609   3245
3       13612   3251
2       13623   1232
1       13619   6574
3       13421   234

现在我想找到每个部门的平均工资。像下面的 Hive 查询:

SELECT dept_id, avg(salary) FROM dept GROUP BY dept_id

这将返回输出:

dept_id avg_sal
----------------
  1     3904.0
  2     2238.5
  3     1742.5

现在,我想做的是生成相同的输出,但使用 mapreduce 框架。那么怎么写呢?提前致谢!

【问题讨论】:

  • 尝试一下,发布您尝试过的内容,我们会提供帮助。
  • 你可以先研究一下map reduce的“字数”...
  • @vefthym 我再次编辑问题

标签: java hadoop mapreduce hive


【解决方案1】:

重要提示: 在尝试实现这一点之前,首先尝试一些 MapReduce 中的基本示例,例如实现字数统计程序,以了解其逻辑,甚至在此之前,阅读有关 MapReduce 工作原理的书籍或教程。

聚合数据(例如求平均值)的想法是在映射阶段按键(部门 ID)分组,然后在缩减阶段减少特定部门的所有薪水。

以更形式化的方式:

地图:

输入:表示工资记录的一行(即,dep_id、emp_id、salary)
输出(键,值):(dep_id,工资)

减少:

输入(键,值):(dep_id,薪水:具有此dep_id的薪水值列表)
输出(key, value): (dep_id, avg(salaries))

这样,属于同一个部门的所有工资都将由同一个reducer处理。在 reducer 中,您所要做的就是找到输入值的平均值。

【讨论】:

  • 非常感谢@vefthym,您以非常简单的方式解释了这一点。现在我一定会构建这段代码。
【解决方案2】:

代码----

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class AverageSalary {
  public static class AvgMapper
       extends Mapper<Object, Text, Text, FloatWritable>{
    private Text dept_id = new Text();
    private FloatWritable salary = new FloatWritable(); 
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
        String values[] = value.toString().split("\t");
        dept_id.set(values[0]);
        salary.set(Float.parseFloat(values[2]));
        context.write(dept_id, salary);
    }
  }

  public static class AvgReducer
       extends Reducer<Text,FloatWritable,Text,FloatWritable> {
    private FloatWritable result = new FloatWritable();

    public void reduce(Text key, Iterable<FloatWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      float sum = 0;
      float count = 0;
      for (FloatWritable val : values) {
        sum += val.get();
        count++;
      }
      result.set(sum/count);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "average salary");
    job.setJarByClass(AverageSalary.class);
    job.setMapperClass(AvgMapper.class);
    job.setCombinerClass(AvgReducer.class);
    job.setReducerClass(AvgReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FloatWritable.class);
    FileInputFormat.addInputPath(job, new Path("/home/kishore/Data/mapreduce.txt"));  // input path
    FileOutputFormat.setOutputPath(job, new Path("/home/kishore/Data/map3")); // output path
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

output 
1   3904.0
2   2238.5
3   1742.5

【讨论】:

  • Krish : 你的输入路径和输出路径应该在 HDFS 中?应该创建 map3 目录还是需要新的输出目录?
  • 用 hdfs 替换这个 /home/kishore/Data/mapreduce.txt;//localhost:9000/filename 与输出相同
【解决方案3】:

如果您还没有参加过任何培训计划,请访问 edureka 提供的免费视频,以更好地理解概念:Map Reduce

映射器

Mapper 将输入键/值对映射到一组中间键/值对。

映射是将输入记录转换为中间记录的单独任务。转换后的中间记录不需要与输入记录的类型相同。一个给定的输入对可以映射到零个或多个输出对。

减速器

Reducer 将一组共享一个键的中间值缩减为一组较小的值。

作业的 reduce 数量由用户通过 Job.setNumReduceTasks(int) 设置。

Apache Hadoop 网站上的工作示例:Word Count example

对于您的用例,简单的使用字数统计示例是不够的。 由于您使用的是 Group by,因此您必须在 Mapper 上使用组合器和分区器。观看此视频:Advanced Map reduce

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-09-27
    • 1970-01-01
    • 2022-11-24
    • 1970-01-01
    • 1970-01-01
    • 2011-02-12
    相关资源
    最近更新 更多