1 pagerank算法介绍
1.1 pagerank的假设
数量假设:每个网页都会给它的链接网页投票,假设这个网页有n个链接,则该网页给每个链接平分投1/n票。
质量假设:一个网页的pagerank值越大,则它的投票越重要。表现为将它的pagerank值作为它投票的加权值。
1.2 矩阵表示形式
.........
最终PR值会收敛为稳定值。
1.3 deadends和spider traps
deadends:一个网页没有链接,则最终PR值会收敛为全为0;
spider traps:一个网页只有指向自身的链接,则最终PR值会收敛为该网页为1,其他全为0。
解决方法:
2 mapReduce流程
2.1 输入数据格式
2.2 总体流程
2.3 MR1
maper1负责读入relation.txt,将数据分割为小单元,计算小单元的转移概率,以小单元的列号为key发送。
maper2负责读入PR.txt,分割为小单元,按行号为key发送。
reducer负责将接收到的pr值与转移概率值一一相乘,再乘以beta-1,然后按行号写入HDFS,
2.4 MR2
maper1从HDFS读入数据,发给reducer。
maper2读取pr.txt,每个单元乘以beta后发往reducer。
每个reducer将接收到的所有乘积相加,得到一行的结果。
2.5 主要代码
UnitMultiplication.java
1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.fs.Path; 3 import org.apache.hadoop.io.Text; 4 import org.apache.hadoop.mapreduce.Job; 5 import org.apache.hadoop.mapreduce.Mapper; 6 import org.apache.hadoop.mapreduce.Reducer; 7 import org.apache.hadoop.mapreduce.lib.chain.ChainMapper; 8 import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; 9 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 11 12 import java.io.IOException; 13 import java.util.ArrayList; 14 import java.util.List; 15 16 public class UnitMultiplication { 17 18 public static class TransitionMapper extends Mapper<Object, Text, Text, Text> { 19 20 @Override 21 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 22 String line = value.toString().trim(); 23 String[] fromTo = line.split("\t"); 24 25 if(fromTo.length == 1 || fromTo[1].trim().equals("")) { 26 return; 27 } 28 String from = fromTo[0]; 29 String[] tos = fromTo[1].split(","); 30 for (String to: tos) { 31 context.write(new Text(from), new Text(to + "=" + (double)1/tos.length)); 32 } 33 } 34 } 35 36 public static class PRMapper extends Mapper<Object, Text, Text, Text> { 37 38 @Override 39 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 40 String[] pr = value.toString().trim().split("\t"); 41 context.write(new Text(pr[0]), new Text(pr[1])); 42 } 43 } 44 45 public static class MultiplicationReducer extends Reducer<Text, Text, Text, Text> { 46 47 float beta; 48 49 @Override 50 public void setup(Context context) { 51 Configuration conf = context.getConfiguration(); 52 beta = conf.getFloat("beta", 0.2f); 53 } 54 55 @Override 56 public void reduce(Text key, Iterable<Text> values, Context context) 57 throws IOException, InterruptedException { 58 List<String> transitionUnit = new ArrayList<String>(); 59 double prUnit = 0; 60 for (Text value: values) { 61 if(value.toString().contains("=")) { 62 transitionUnit.add(value.toString()); 63 } 64 else { 65 prUnit = Double.parseDouble(value.toString()); 66 } 67 } 68 for (String unit: transitionUnit) { 69 String outputKey = unit.split("=")[0]; 70 double relation = Double.parseDouble(unit.split("=")[1]); 71 //transition matrix * pageRank matrix * (1-beta) 72 String outputValue = String.valueOf(relation * prUnit * (1-beta)); 73 context.write(new Text(outputKey), new Text(outputValue)); 74 } 75 } 76 } 77 78 public static void main(String[] args) throws Exception { 79 80 Configuration conf = new Configuration(); 81 conf.setFloat("beta", Float.parseFloat(args[3])); 82 Job job = Job.getInstance(conf); 83 job.setJarByClass(UnitMultiplication.class); 84 85 ChainMapper.addMapper(job, TransitionMapper.class, Object.class, Text.class, Text.class, Text.class, conf); 86 ChainMapper.addMapper(job, PRMapper.class, Object.class, Text.class, Text.class, Text.class, conf); 87 88 job.setReducerClass(MultiplicationReducer.class); 89 90 job.setOutputKeyClass(Text.class); 91 job.setOutputValueClass(Text.class); 92 93 MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, TransitionMapper.class); 94 MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, PRMapper.class); 95 96 FileOutputFormat.setOutputPath(job, new Path(args[2])); 97 job.waitForCompletion(true); 98 } 99 100 }