求所有两两用户之间的共同好友
数据格式
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J,K
以上是数据:
A:B,C,D,F,E,O
表示:B,C,D,E,F,O是A用户的好友。
1 public class SharedFriend { 2 /* 3 第一阶段的map函数主要完成以下任务 4 1.遍历原始文件中每行<所有朋友>信息 5 2.遍历“朋友”集合,以每个“朋友”为键,原来的“人”为值 即输出<朋友,人> 6 */ 7 static class SharedFriendMapper01 extends Mapper<LongWritable, Text, Text, Text>{ 8 @Override 9 protected void map(LongWritable key, Text value,Context context) 10 throws IOException, InterruptedException { 11 String line = value.toString(); 12 String[] person_friends = line.split(":"); 13 String person = person_friends[0]; 14 String[] friends = person_friends[1].split(","); 15 16 for(String friend : friends){ 17 context.write(new Text(friend), new Text(person)); 18 } 19 } 20 } 21 22 /* 23 第一阶段的reduce函数主要完成以下任务 24 1.对所有传过来的<朋友,list(人)>进行拼接,输出<朋友,拥有这名朋友的所有人> 25 */ 26 static class SharedFriendReducer01 extends Reducer<Text, Text, Text, Text>{ 27 @Override 28 protected void reduce(Text key, Iterable<Text> values,Context context) 29 throws IOException, InterruptedException { 30 StringBuffer sb = new StringBuffer(); 31 for(Text friend : values){ 32 sb.append(friend.toString()).append(","); 33 } 34 sb.deleteCharAt(sb.length()-1); 35 context.write(key, new Text(sb.toString())); 36 } 37 } 38 39 /* 40 第二阶段的map函数主要完成以下任务 41 1.将上一阶段reduce输出的<朋友,拥有这名朋友的所有人>信息中的 “拥有这名朋友的所有人”进行排序 ,以防出现B-C C-B这样的重复 42 2.将 “拥有这名朋友的所有人”进行两两配对,并将配对后的字符串当做键,“朋友”当做值输出,即输出<人-人,共同朋友> 43 */ 44 static class SharedFriendMapper02 extends Mapper<LongWritable, Text, Text, Text>{ 45 @Override 46 protected void map(LongWritable key, Text value,Context context) 47 throws IOException, InterruptedException { 48 String line = value.toString(); 49 String[] friend_persons = line.split("\t"); 50 String friend = friend_persons[0]; 51 String[] persons = friend_persons[1].split(","); 52 Arrays.sort(persons); //排序 53 54 //两两配对 55 for(int i=0;i<persons.length-1;i++){ 56 for(int j=i+1;j<persons.length;j++){ 57 context.write(new Text(persons[i]+"-"+persons[j]+":"), new Text(friend)); 58 } 59 } 60 } 61 } 62 63 /* 64 第二阶段的reduce函数主要完成以下任务 65 1.<人-人,list(共同朋友)> 中的“共同好友”进行拼接 最后输出<人-人,两人的所有共同好友> 66 */ 67 static class SharedFriendReducer02 extends Reducer<Text, Text, Text, Text>{ 68 @Override 69 protected void reduce(Text key, Iterable<Text> values,Context context) 70 throws IOException, InterruptedException { 71 StringBuffer sb = new StringBuffer(); 72 Set<String> set = new HashSet<String>(); 73 for(Text friend : values){ 74 if(!set.contains(friend.toString())) 75 set.add(friend.toString()); 76 } 77 for(String friend : set){ 78 sb.append(friend.toString()).append(","); 79 } 80 sb.deleteCharAt(sb.length()-1); 81 82 context.write(key, new Text(sb.toString())); 83 } 84 } 85 86 public static void main(String[] args)throws Exception { 87 Configuration conf = new Configuration(); 88 89 //第一阶段 90 Job job1 = Job.getInstance(conf); 91 job1.setJarByClass(SharedFriend.class); 92 job1.setMapperClass(SharedFriendMapper01.class); 93 job1.setReducerClass(SharedFriendReducer01.class); 94 95 job1.setOutputKeyClass(Text.class); 96 job1.setOutputValueClass(Text.class); 97 98 FileInputFormat.setInputPaths(job1, new Path("H:/大数据/mapreduce/sharedfriend/input")); 99 FileOutputFormat.setOutputPath(job1, new Path("H:/大数据/mapreduce/sharedfriend/output")); 100 101 boolean res1 = job1.waitForCompletion(true); 102 103 //第二阶段 104 Job job2 = Job.getInstance(conf); 105 job2.setJarByClass(SharedFriend.class); 106 job2.setMapperClass(SharedFriendMapper02.class); 107 job2.setReducerClass(SharedFriendReducer02.class); 108 109 job2.setOutputKeyClass(Text.class); 110 job2.setOutputValueClass(Text.class); 111 112 FileInputFormat.setInputPaths(job2, new Path("H:/大数据/mapreduce/sharedfriend/output")); 113 FileOutputFormat.setOutputPath(job2, new Path("H:/大数据/mapreduce/sharedfriend/output01")); 114 115 boolean res2 = job2.waitForCompletion(true); 116 117 System.exit(res1?0:1); 118 } 119 }
第一阶段输出结果
1 A F,I,O,K,G,D,C,H,B 2 B E,J,F,A 3 C B,E,K,A,H,G,F 4 D H,C,G,F,E,A,K,L 5 E A,B,L,G,M,F,D,H 6 F C,M,L,A,D,G 7 G M 8 H O 9 I O,C 10 J O 11 K O,B 12 L D,E 13 M E,F 14 O A,H,I,J,F