数据及需求

数据格式

movies.dat  3884条数据

1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
9::Sudden Death (1995)::Action
10::GoldenEye (1995)::Action|Adventure|Thriller

users.dat  6041条数据

1::F::1::10::48067
2::M::56::16::70072
3::M::25::15::55117
4::M::45::7::02460
5::M::25::20::55455
6::F::50::9::55117
7::M::35::1::06810
8::M::25::12::11413
9::M::25::17::61614
10::F::35::1::95370

ratings.dat  1000210条数据

1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
1::1287::5::978302039
1::2804::5::978300719
1::594::4::978302268
1::919::4::978301368

数据解释

1、users.dat 数据格式为: 2::M::56::16::70072
对应字段为:UserID BigInt, Gender String, Age Int, Occupation String, Zipcode String
对应字段中文解释:用户id,性别,年龄,职业,邮政编码

2、movies.dat 数据格式为: 2::Jumanji (1995)::Adventure|Children's|Fantasy
对应字段为:MovieID BigInt, Title String, Genres String
对应字段中文解释:电影ID,电影名字,电影类型

3、ratings.dat 数据格式为: 1::1193::5::978300760
对应字段为:UserID BigInt, MovieID BigInt, Rating Double, Timestamped String
对应字段中文解释:用户ID,电影ID,评分,评分时间戳

用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型
userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType

需求统计

(1)求被评分次数最多的10部电影,并给出评分次数(电影名,评分次数)
(2)分别求男性,女性当中评分最高的10部电影(性别,电影名,评分)
(3)求movieid = 2116这部电影各年龄段(因为年龄就只有7个,就按这个7个分就好了)的平均影评(年龄段,评分)
(4)求最喜欢看电影(影评次数最多)的那位女性评最高分的10部电影的平均影评分(人,电影名,影评)
(5)求好片(评分>=4.0)最多的那个年份的最好看的10部电影
(6)求1997年上映的电影中,评分最高的10部Comedy类电影
(7)该影评库中各种类型电影中评价最高的5部电影(类型,电影名,平均影评分)
(8)各年评分最高的电影类型(年份,类型,影评分)
(9)每个地区最高评分的电影名,把结果存入HDFS(地区,电影名,电影评分)

代码实现

1、求被评分次数最多的10部电影,并给出评分次数(电影名,评分次数)

分析:此问题涉及到2个文件,ratings.dat和movies.dat,2个文件数据量倾斜比较严重,此处应该使用mapjoin方法,先将数据量较小的文件预先加载到内存中

MovieMR1_1.java

  1 public class MovieMR1_1 {
  2 
  3     public static void main(String[] args) throws Exception {
  4         
  5         if(args.length < 4) {
  6             args = new String[4];
  7             args[0] = "/movie/input/";
  8             args[1] = "/movie/output/";
  9             args[2] = "/movie/cache/movies.dat";
 10             args[3] = "/movie/output_last/";
 11         }
 12         
 13         
 14         Configuration conf1 = new Configuration();
 15         conf1.set("fs.defaultFS", "hdfs://hadoop1:9000/");
 16         System.setProperty("HADOOP_USER_NAME", "hadoop");
 17         FileSystem fs1 = FileSystem.get(conf1);
 18         
 19         
 20         Job job1 = Job.getInstance(conf1);
 21         
 22         job1.setJarByClass(MovieMR1_1.class);
 23         
 24         job1.setMapperClass(MoviesMapJoinRatingsMapper1.class);
 25         job1.setReducerClass(MovieMR1Reducer1.class);
 26         
 27         job1.setMapOutputKeyClass(Text.class);
 28         job1.setMapOutputValueClass(IntWritable.class);
 29         
 30         job1.setOutputKeyClass(Text.class);
 31         job1.setOutputValueClass(IntWritable.class);
 32         
 33         
 34         
 35         //缓存普通文件到task运行节点的工作目录
 36         URI uri = new URI("hdfs://hadoop1:9000"+args[2]);
 37         System.out.println(uri);
 38         job1.addCacheFile(uri);
 39         
 40         
 41         Path inputPath1 = new Path(args[0]);
 42         Path outputPath1 = new Path(args[1]);
 43         if(fs1.exists(outputPath1)) {
 44             fs1.delete(outputPath1, true);
 45         }
 46         FileInputFormat.setInputPaths(job1, inputPath1);
 47         FileOutputFormat.setOutputPath(job1, outputPath1);
 48         
 49         boolean isDone = job1.waitForCompletion(true);
 50         System.exit(isDone ? 0 : 1);
 51        
 52     }
 53     
 54     public static class MoviesMapJoinRatingsMapper1 extends Mapper<LongWritable, Text, Text, IntWritable>{
 55         
 56         //用了存放加载到内存中的movies.dat数据
 57         private static Map<String,String> movieMap =  new HashMap<>();
 58         //key:电影ID
 59         Text outKey = new Text();
 60         //value:电影名+电影类型
 61         IntWritable outValue = new IntWritable();
 62         
 63         
 64         /**
 65          * movies.dat:    1::Toy Story (1995)::Animation|Children's|Comedy
 66          * 
 67          * 
 68          * 将小表(movies.dat)中的数据预先加载到内存中去
 69          * */
 70         @Override
 71         protected void setup(Context context) throws IOException, InterruptedException {
 72             
 73             Path[] localCacheFiles = context.getLocalCacheFiles();
 74             
 75             
 76             String strPath = localCacheFiles[0].toUri().toString();
 77             
 78             BufferedReader br = new BufferedReader(new FileReader(strPath));
 79             String readLine;
 80             while((readLine = br.readLine()) != null) {
 81                 
 82                 String[] split = readLine.split("::");
 83                 String movieId = split[0];
 84                 String movieName = split[1];
 85                 String movieType = split[2];
 86                 
 87                 movieMap.put(movieId, movieName+"\t"+movieType);
 88             }
 89             
 90             br.close();
 91         }
 92         
 93         
 94         /**
 95          * movies.dat:    1    ::    Toy Story (1995)    ::    Animation|Children's|Comedy    
 96          *                 电影ID    电影名字                    电影类型
 97          * 
 98          * ratings.dat:    1    ::    1193    ::    5    ::    978300760
 99          *                 用户ID    电影ID        评分        评分时间戳
100          * 
101          * value:    ratings.dat读取的数据
102          * */
103         @Override
104         protected void map(LongWritable key, Text value, Context context)
105                 throws IOException, InterruptedException {
106             
107             String[] split = value.toString().split("::");
108             
109             String userId = split[0];
110             String movieId = split[1];
111             String movieRate = split[2];
112             
113             //根据movieId从内存中获取电影名和类型
114             String movieNameAndType = movieMap.get(movieId);
115             String movieName = movieNameAndType.split("\t")[0];
116             String movieType = movieNameAndType.split("\t")[1];
117             
118             outKey.set(movieName);
119             outValue.set(Integer.parseInt(movieRate));
120             
121             context.write(outKey, outValue);
122             
123         }
124             
125     }
126 
127     
128     public static class MovieMR1Reducer1 extends Reducer<Text, IntWritable, Text, IntWritable>{
129         //每部电影评论的次数
130         int count;
131         //评分次数
132         IntWritable outValue = new IntWritable();
133         
134         @Override
135         protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
136             
137             count = 0;
138             
139             for(IntWritable value : values) {
140                 count++;
141             }
142             
143             outValue.set(count);
144             
145             context.write(key, outValue);
146         }
147         
148     }
149     
150     
151 }
View Code

相关文章:

  • 2021-10-08
  • 2022-02-23
  • 2021-11-20
  • 2021-07-10
  • 2022-01-21
  • 2021-07-06
  • 2022-01-18
猜你喜欢
  • 2022-02-06
  • 2021-09-10
  • 2022-02-25
  • 2021-10-13
  • 2021-09-25
  • 2022-02-06
  • 2021-09-22
相关资源
相似解决方案