第一题

下面是三种商品的销售数据 

Hadoop学习之路(二十七)MapReduce的API使用(四)

要求:根据以上数据,用 MapReduce 统计出如下数据:

1、每种商品的销售总金额,并降序排序

2、每种商品销售额最多的三周

 

 

第二题:MapReduce 题

现有如下数据文件需要处理:

格式:CSV

数据样例:

user_a,location_a,2018-01-01 08:00:00,60

user_a,location_a,2018-01-01 09:00:00,60

user_a,location_b,2018-01-01 10:00:00,60

user_a,location_a,2018-01-01 11:00:00,60

字段:用户 ID,位置 ID,开始时间,停留时长(分钟)

数据意义:某个用户在某个位置从某个时刻开始停留了多长时间

处理逻辑: 对同一个用户,在同一个位置,连续的多条记录进行合并

合并原则:开始时间取最早的,停留时长加和

要求:请编写 MapReduce 程序实现

其他:只有数据样例,没有数据。

UserLocationMR.java

  1 /**
  2 测试数据:
  3 user_a    location_a    2018-01-01 08:00:00    60
  4 user_a    location_a    2018-01-01 09:00:00    60
  5 user_a    location_a    2018-01-01 11:00:00    60
  6 user_a    location_a    2018-01-01 12:00:00    60
  7 user_a    location_b    2018-01-01 10:00:00    60
  8 user_a    location_c    2018-01-01 08:00:00    60
  9 user_a    location_c    2018-01-01 09:00:00    60
 10 user_a    location_c    2018-01-01 10:00:00    60
 11 user_b    location_a    2018-01-01 15:00:00    60
 12 user_b    location_a    2018-01-01 16:00:00    60
 13 user_b    location_a    2018-01-01 18:00:00    60
 14 
 15 
 16 结果数据:
 17 user_a    location_a    2018-01-01 08:00:00    120
 18 user_a    location_a    2018-01-01 11:00:00    120
 19 user_a    location_b    2018-01-01 10:00:00    60
 20 user_a    location_c    2018-01-01 08:00:00    180
 21 user_b    location_a    2018-01-01 15:00:00    120
 22 user_b    location_a    2018-01-01 18:00:00    60
 23 
 24 
 25  */
 26 public class UserLocationMR {
 27 
 28     public static void main(String[] args) throws Exception {
 29         // 指定hdfs相关的参数
 30         Configuration conf = new Configuration();
 31         //        conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
 32         //        System.setProperty("HADOOP_USER_NAME", "hadoop");
 33 
 34         Job job = Job.getInstance(conf);
 35         // 设置jar包所在路径
 36         job.setJarByClass(UserLocationMR.class);
 37 
 38         // 指定mapper类和reducer类
 39         job.setMapperClass(UserLocationMRMapper.class);
 40         job.setReducerClass(UserLocationMRReducer.class);
 41 
 42         // 指定maptask的输出类型
 43         job.setMapOutputKeyClass(UserLocation.class);
 44         job.setMapOutputValueClass(NullWritable.class);
 45         // 指定reducetask的输出类型
 46         job.setOutputKeyClass(UserLocation.class);
 47         job.setOutputValueClass(NullWritable.class);
 48 
 49         job.setGroupingComparatorClass(UserLocationGC.class);
 50 
 51         // 指定该mapreduce程序数据的输入和输出路径
 52         Path inputPath = new Path("D:\\武文\\second\\input");
 53         Path outputPath = new Path("D:\\武文\\second\\output2");
 54         FileSystem fs = FileSystem.get(conf);
 55         if (fs.exists(outputPath)) {
 56             fs.delete(outputPath, true);
 57         }
 58         FileInputFormat.setInputPaths(job, inputPath);
 59         FileOutputFormat.setOutputPath(job, outputPath);
 60 
 61         // 最后提交任务
 62         boolean waitForCompletion = job.waitForCompletion(true);
 63         System.exit(waitForCompletion ? 0 : 1);
 64     }
 65 
 66     private static class UserLocationMRMapper extends Mapper<LongWritable, Text, UserLocation, NullWritable> {
 67 
 68         UserLocation outKey = new UserLocation();
 69 
 70         /**
 71          * value = user_a,location_a,2018-01-01 12:00:00,60
 72          */
 73         @Override
 74         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 75 
 76             String[] split = value.toString().split(",");
 77 
 78             outKey.set(split);
 79 
 80             context.write(outKey, NullWritable.get());
 81         }
 82     }
 83 
 84     private static class UserLocationMRReducer extends Reducer<UserLocation, NullWritable, UserLocation, NullWritable> {
 85 
 86         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 87 
 88         UserLocation outKey = new UserLocation();
 89 
 90         /**
 91          * user_a    location_a    2018-01-01 08:00:00    60
 92          * user_a    location_a    2018-01-01 09:00:00    60
 93          * user_a    location_a    2018-01-01 11:00:00    60
 94          * user_a    location_a    2018-01-01 12:00:00    60
 95          */
 96         @Override
 97         protected void reduce(UserLocation key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
 98 
 99             int count = 0;
100             for (NullWritable nvl : values) {
101                 count++;
102                 // 如果是这一组key-value中的第一个元素时,直接赋值给outKey对象。基础对象
103                 if (count == 1) {
104                     // 复制值
105                     outKey.set(key);
106                 } else {
107 
108                     // 有可能连续,有可能不连续,  连续则继续变量, 否则输出
109                     long current_timestamp = 0;
110                     long last_timestamp = 0;
111                     try {
112                         // 这是新遍历出来的记录的时间戳
113                         current_timestamp = sdf.parse(key.getTime()).getTime();
114                         // 这是上一条记录的时间戳 和 停留时间之和
115                         last_timestamp = sdf.parse(outKey.getTime()).getTime() + outKey.getDuration() * 60 * 1000;
116                     } catch (ParseException e) {
117                         e.printStackTrace();
118                     }
119 
120                     // 如果相等,证明是连续记录,所以合并
121                     if (current_timestamp == last_timestamp) {
122 
123                         outKey.setDuration(outKey.getDuration() + key.getDuration());
124 
125                     } else {
126 
127                         // 先输出上一条记录
128                         context.write(outKey, nvl);
129 
130                         // 然后再次记录当前遍历到的这一条记录
131                         outKey.set(key);
132                     }
133                 }
134             }
135             // 最后无论如何,还得输出最后一次
136             context.write(outKey, NullWritable.get());
137         }
138     }
139 }
View Code

相关文章: