第一题
下面是三种商品的销售数据
要求:根据以上数据,用 MapReduce 统计出如下数据:
1、每种商品的销售总金额,并降序排序
2、每种商品销售额最多的三周
第二题:MapReduce 题
现有如下数据文件需要处理:
格式:CSV
数据样例:
user_a,location_a,2018-01-01 08:00:00,60
user_a,location_a,2018-01-01 09:00:00,60
user_a,location_b,2018-01-01 10:00:00,60
user_a,location_a,2018-01-01 11:00:00,60
字段:用户 ID,位置 ID,开始时间,停留时长(分钟)
数据意义:某个用户在某个位置从某个时刻开始停留了多长时间
处理逻辑: 对同一个用户,在同一个位置,连续的多条记录进行合并
合并原则:开始时间取最早的,停留时长加和
要求:请编写 MapReduce 程序实现
其他:只有数据样例,没有数据。
UserLocationMR.java
1 /** 2 测试数据: 3 user_a location_a 2018-01-01 08:00:00 60 4 user_a location_a 2018-01-01 09:00:00 60 5 user_a location_a 2018-01-01 11:00:00 60 6 user_a location_a 2018-01-01 12:00:00 60 7 user_a location_b 2018-01-01 10:00:00 60 8 user_a location_c 2018-01-01 08:00:00 60 9 user_a location_c 2018-01-01 09:00:00 60 10 user_a location_c 2018-01-01 10:00:00 60 11 user_b location_a 2018-01-01 15:00:00 60 12 user_b location_a 2018-01-01 16:00:00 60 13 user_b location_a 2018-01-01 18:00:00 60 14 15 16 结果数据: 17 user_a location_a 2018-01-01 08:00:00 120 18 user_a location_a 2018-01-01 11:00:00 120 19 user_a location_b 2018-01-01 10:00:00 60 20 user_a location_c 2018-01-01 08:00:00 180 21 user_b location_a 2018-01-01 15:00:00 120 22 user_b location_a 2018-01-01 18:00:00 60 23 24 25 */ 26 public class UserLocationMR { 27 28 public static void main(String[] args) throws Exception { 29 // 指定hdfs相关的参数 30 Configuration conf = new Configuration(); 31 // conf.set("fs.defaultFS", "hdfs://hadoop02:9000"); 32 // System.setProperty("HADOOP_USER_NAME", "hadoop"); 33 34 Job job = Job.getInstance(conf); 35 // 设置jar包所在路径 36 job.setJarByClass(UserLocationMR.class); 37 38 // 指定mapper类和reducer类 39 job.setMapperClass(UserLocationMRMapper.class); 40 job.setReducerClass(UserLocationMRReducer.class); 41 42 // 指定maptask的输出类型 43 job.setMapOutputKeyClass(UserLocation.class); 44 job.setMapOutputValueClass(NullWritable.class); 45 // 指定reducetask的输出类型 46 job.setOutputKeyClass(UserLocation.class); 47 job.setOutputValueClass(NullWritable.class); 48 49 job.setGroupingComparatorClass(UserLocationGC.class); 50 51 // 指定该mapreduce程序数据的输入和输出路径 52 Path inputPath = new Path("D:\\武文\\second\\input"); 53 Path outputPath = new Path("D:\\武文\\second\\output2"); 54 FileSystem fs = FileSystem.get(conf); 55 if (fs.exists(outputPath)) { 56 fs.delete(outputPath, true); 57 } 58 FileInputFormat.setInputPaths(job, inputPath); 59 FileOutputFormat.setOutputPath(job, outputPath); 60 61 // 最后提交任务 62 boolean waitForCompletion = job.waitForCompletion(true); 63 System.exit(waitForCompletion ? 0 : 1); 64 } 65 66 private static class UserLocationMRMapper extends Mapper<LongWritable, Text, UserLocation, NullWritable> { 67 68 UserLocation outKey = new UserLocation(); 69 70 /** 71 * value = user_a,location_a,2018-01-01 12:00:00,60 72 */ 73 @Override 74 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 75 76 String[] split = value.toString().split(","); 77 78 outKey.set(split); 79 80 context.write(outKey, NullWritable.get()); 81 } 82 } 83 84 private static class UserLocationMRReducer extends Reducer<UserLocation, NullWritable, UserLocation, NullWritable> { 85 86 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 87 88 UserLocation outKey = new UserLocation(); 89 90 /** 91 * user_a location_a 2018-01-01 08:00:00 60 92 * user_a location_a 2018-01-01 09:00:00 60 93 * user_a location_a 2018-01-01 11:00:00 60 94 * user_a location_a 2018-01-01 12:00:00 60 95 */ 96 @Override 97 protected void reduce(UserLocation key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { 98 99 int count = 0; 100 for (NullWritable nvl : values) { 101 count++; 102 // 如果是这一组key-value中的第一个元素时,直接赋值给outKey对象。基础对象 103 if (count == 1) { 104 // 复制值 105 outKey.set(key); 106 } else { 107 108 // 有可能连续,有可能不连续, 连续则继续变量, 否则输出 109 long current_timestamp = 0; 110 long last_timestamp = 0; 111 try { 112 // 这是新遍历出来的记录的时间戳 113 current_timestamp = sdf.parse(key.getTime()).getTime(); 114 // 这是上一条记录的时间戳 和 停留时间之和 115 last_timestamp = sdf.parse(outKey.getTime()).getTime() + outKey.getDuration() * 60 * 1000; 116 } catch (ParseException e) { 117 e.printStackTrace(); 118 } 119 120 // 如果相等,证明是连续记录,所以合并 121 if (current_timestamp == last_timestamp) { 122 123 outKey.setDuration(outKey.getDuration() + key.getDuration()); 124 125 } else { 126 127 // 先输出上一条记录 128 context.write(outKey, nvl); 129 130 // 然后再次记录当前遍历到的这一条记录 131 outKey.set(key); 132 } 133 } 134 } 135 // 最后无论如何,还得输出最后一次 136 context.write(outKey, NullWritable.get()); 137 } 138 } 139 }