数据及需求
数据格式
movies.dat 3884条数据
1::Toy Story (1995)::Animation|Children's|Comedy 2::Jumanji (1995)::Adventure|Children's|Fantasy 3::Grumpier Old Men (1995)::Comedy|Romance 4::Waiting to Exhale (1995)::Comedy|Drama 5::Father of the Bride Part II (1995)::Comedy 6::Heat (1995)::Action|Crime|Thriller 7::Sabrina (1995)::Comedy|Romance 8::Tom and Huck (1995)::Adventure|Children's 9::Sudden Death (1995)::Action 10::GoldenEye (1995)::Action|Adventure|Thriller
users.dat 6041条数据
1::F::1::10::48067 2::M::56::16::70072 3::M::25::15::55117 4::M::45::7::02460 5::M::25::20::55455 6::F::50::9::55117 7::M::35::1::06810 8::M::25::12::11413 9::M::25::17::61614 10::F::35::1::95370
ratings.dat 1000210条数据
1::1193::5::978300760 1::661::3::978302109 1::914::3::978301968 1::3408::4::978300275 1::2355::5::978824291 1::1197::3::978302268 1::1287::5::978302039 1::2804::5::978300719 1::594::4::978302268 1::919::4::978301368
数据解释
1、users.dat 数据格式为: 2::M::56::16::70072
对应字段为:UserID BigInt, Gender String, Age Int, Occupation String, Zipcode String
对应字段中文解释:用户id,性别,年龄,职业,邮政编码
2、movies.dat 数据格式为: 2::Jumanji (1995)::Adventure|Children's|Fantasy
对应字段为:MovieID BigInt, Title String, Genres String
对应字段中文解释:电影ID,电影名字,电影类型
3、ratings.dat 数据格式为: 1::1193::5::978300760
对应字段为:UserID BigInt, MovieID BigInt, Rating Double, Timestamped String
对应字段中文解释:用户ID,电影ID,评分,评分时间戳
用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型
userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType
需求统计
(1)求被评分次数最多的10部电影,并给出评分次数(电影名,评分次数)
(2)分别求男性,女性当中评分最高的10部电影(性别,电影名,评分)
(3)求movieid = 2116这部电影各年龄段(因为年龄就只有7个,就按这个7个分就好了)的平均影评(年龄段,评分)
(4)求最喜欢看电影(影评次数最多)的那位女性评最高分的10部电影的平均影评分(人,电影名,影评)
(5)求好片(评分>=4.0)最多的那个年份的最好看的10部电影
(6)求1997年上映的电影中,评分最高的10部Comedy类电影
(7)该影评库中各种类型电影中评价最高的5部电影(类型,电影名,平均影评分)
(8)各年评分最高的电影类型(年份,类型,影评分)
(9)每个地区最高评分的电影名,把结果存入HDFS(地区,电影名,电影评分)
代码实现
1、求被评分次数最多的10部电影,并给出评分次数(电影名,评分次数)
分析:此问题涉及到2个文件,ratings.dat和movies.dat,2个文件数据量倾斜比较严重,此处应该使用mapjoin方法,先将数据量较小的文件预先加载到内存中
MovieMR1_1.java
1 public class MovieMR1_1 { 2 3 public static void main(String[] args) throws Exception { 4 5 if(args.length < 4) { 6 args = new String[4]; 7 args[0] = "/movie/input/"; 8 args[1] = "/movie/output/"; 9 args[2] = "/movie/cache/movies.dat"; 10 args[3] = "/movie/output_last/"; 11 } 12 13 14 Configuration conf1 = new Configuration(); 15 conf1.set("fs.defaultFS", "hdfs://hadoop1:9000/"); 16 System.setProperty("HADOOP_USER_NAME", "hadoop"); 17 FileSystem fs1 = FileSystem.get(conf1); 18 19 20 Job job1 = Job.getInstance(conf1); 21 22 job1.setJarByClass(MovieMR1_1.class); 23 24 job1.setMapperClass(MoviesMapJoinRatingsMapper1.class); 25 job1.setReducerClass(MovieMR1Reducer1.class); 26 27 job1.setMapOutputKeyClass(Text.class); 28 job1.setMapOutputValueClass(IntWritable.class); 29 30 job1.setOutputKeyClass(Text.class); 31 job1.setOutputValueClass(IntWritable.class); 32 33 34 35 //缓存普通文件到task运行节点的工作目录 36 URI uri = new URI("hdfs://hadoop1:9000"+args[2]); 37 System.out.println(uri); 38 job1.addCacheFile(uri); 39 40 41 Path inputPath1 = new Path(args[0]); 42 Path outputPath1 = new Path(args[1]); 43 if(fs1.exists(outputPath1)) { 44 fs1.delete(outputPath1, true); 45 } 46 FileInputFormat.setInputPaths(job1, inputPath1); 47 FileOutputFormat.setOutputPath(job1, outputPath1); 48 49 boolean isDone = job1.waitForCompletion(true); 50 System.exit(isDone ? 0 : 1); 51 52 } 53 54 public static class MoviesMapJoinRatingsMapper1 extends Mapper<LongWritable, Text, Text, IntWritable>{ 55 56 //用了存放加载到内存中的movies.dat数据 57 private static Map<String,String> movieMap = new HashMap<>(); 58 //key:电影ID 59 Text outKey = new Text(); 60 //value:电影名+电影类型 61 IntWritable outValue = new IntWritable(); 62 63 64 /** 65 * movies.dat: 1::Toy Story (1995)::Animation|Children's|Comedy 66 * 67 * 68 * 将小表(movies.dat)中的数据预先加载到内存中去 69 * */ 70 @Override 71 protected void setup(Context context) throws IOException, InterruptedException { 72 73 Path[] localCacheFiles = context.getLocalCacheFiles(); 74 75 76 String strPath = localCacheFiles[0].toUri().toString(); 77 78 BufferedReader br = new BufferedReader(new FileReader(strPath)); 79 String readLine; 80 while((readLine = br.readLine()) != null) { 81 82 String[] split = readLine.split("::"); 83 String movieId = split[0]; 84 String movieName = split[1]; 85 String movieType = split[2]; 86 87 movieMap.put(movieId, movieName+"\t"+movieType); 88 } 89 90 br.close(); 91 } 92 93 94 /** 95 * movies.dat: 1 :: Toy Story (1995) :: Animation|Children's|Comedy 96 * 电影ID 电影名字 电影类型 97 * 98 * ratings.dat: 1 :: 1193 :: 5 :: 978300760 99 * 用户ID 电影ID 评分 评分时间戳 100 * 101 * value: ratings.dat读取的数据 102 * */ 103 @Override 104 protected void map(LongWritable key, Text value, Context context) 105 throws IOException, InterruptedException { 106 107 String[] split = value.toString().split("::"); 108 109 String userId = split[0]; 110 String movieId = split[1]; 111 String movieRate = split[2]; 112 113 //根据movieId从内存中获取电影名和类型 114 String movieNameAndType = movieMap.get(movieId); 115 String movieName = movieNameAndType.split("\t")[0]; 116 String movieType = movieNameAndType.split("\t")[1]; 117 118 outKey.set(movieName); 119 outValue.set(Integer.parseInt(movieRate)); 120 121 context.write(outKey, outValue); 122 123 } 124 125 } 126 127 128 public static class MovieMR1Reducer1 extends Reducer<Text, IntWritable, Text, IntWritable>{ 129 //每部电影评论的次数 130 int count; 131 //评分次数 132 IntWritable outValue = new IntWritable(); 133 134 @Override 135 protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { 136 137 count = 0; 138 139 for(IntWritable value : values) { 140 count++; 141 } 142 143 outValue.set(count); 144 145 context.write(key, outValue); 146 } 147 148 } 149 150 151 }