1 pagerank算法介绍

1.1 pagerank的假设

  数量假设:每个网页都会给它的链接网页投票,假设这个网页有n个链接,则该网页给每个链接平分投1/n票。

  质量假设:一个网页的pagerank值越大,则它的投票越重要。表现为将它的pagerank值作为它投票的加权值。

1.2 矩阵表示形式

  mapReduce编程之google pageRank

  .........

      mapReduce编程之google pageRank

最终PR值会收敛为稳定值。

1.3 deadends和spider traps

deadends:一个网页没有链接,则最终PR值会收敛为全为0;

spider traps:一个网页只有指向自身的链接,则最终PR值会收敛为该网页为1,其他全为0。

解决方法:

mapReduce编程之google pageRank

2 mapReduce流程

2.1 输入数据格式

mapReduce编程之google pageRank   

  mapReduce编程之google pageRank

2.2 总体流程

mapReduce编程之google pageRank

 

2.3 MR1

  maper1负责读入relation.txt,将数据分割为小单元,计算小单元的转移概率,以小单元的列号为key发送。

mapReduce编程之google pageRank

  maper2负责读入PR.txt,分割为小单元,按行号为key发送。

mapReduce编程之google pageRank

  reducer负责将接收到的pr值与转移概率值一一相乘,再乘以beta-1,然后按行号写入HDFS,

    mapReduce编程之google pageRank

2.4 MR2

  maper1从HDFS读入数据,发给reducer。

  maper2读取pr.txt,每个单元乘以beta后发往reducer。

  每个reducer将接收到的所有乘积相加,得到一行的结果。

    mapReduce编程之google pageRank

 

2.5 主要代码

UnitMultiplication.java
  1 import org.apache.hadoop.conf.Configuration;
  2 import org.apache.hadoop.fs.Path;
  3 import org.apache.hadoop.io.Text;
  4 import org.apache.hadoop.mapreduce.Job;
  5 import org.apache.hadoop.mapreduce.Mapper;
  6 import org.apache.hadoop.mapreduce.Reducer;
  7 import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
  8 import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
  9 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 11 
 12 import java.io.IOException;
 13 import java.util.ArrayList;
 14 import java.util.List;
 15 
 16 public class UnitMultiplication {
 17 
 18     public static class TransitionMapper extends Mapper<Object, Text, Text, Text> {
 19 
 20         @Override
 21         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
 22             String line = value.toString().trim();
 23             String[] fromTo = line.split("\t");
 24 
 25             if(fromTo.length == 1 || fromTo[1].trim().equals("")) {
 26                 return;
 27             }
 28             String from = fromTo[0];
 29             String[] tos = fromTo[1].split(",");
 30             for (String to: tos) {
 31                 context.write(new Text(from), new Text(to + "=" + (double)1/tos.length));
 32             }
 33         }
 34     }
 35 
 36     public static class PRMapper extends Mapper<Object, Text, Text, Text> {
 37 
 38         @Override
 39         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
 40             String[] pr = value.toString().trim().split("\t");
 41             context.write(new Text(pr[0]), new Text(pr[1]));
 42         }
 43     }
 44 
 45     public static class MultiplicationReducer extends Reducer<Text, Text, Text, Text> {
 46 
 47         float beta;
 48 
 49         @Override
 50         public void setup(Context context) {
 51             Configuration conf = context.getConfiguration();
 52             beta = conf.getFloat("beta", 0.2f);
 53         }
 54 
 55         @Override
 56         public void reduce(Text key, Iterable<Text> values, Context context)
 57                 throws IOException, InterruptedException {
 58             List<String> transitionUnit = new ArrayList<String>();
 59             double prUnit = 0;
 60             for (Text value: values) {
 61                 if(value.toString().contains("=")) {
 62                     transitionUnit.add(value.toString());
 63                 }
 64                 else {
 65                     prUnit = Double.parseDouble(value.toString());
 66                 }
 67             }
 68             for (String unit: transitionUnit) {
 69                 String outputKey = unit.split("=")[0];
 70                 double relation = Double.parseDouble(unit.split("=")[1]);
 71                 //transition matrix * pageRank matrix * (1-beta)
 72                 String outputValue = String.valueOf(relation * prUnit * (1-beta));
 73                 context.write(new Text(outputKey), new Text(outputValue));
 74             }
 75         }
 76     }
 77 
 78     public static void main(String[] args) throws Exception {
 79 
 80         Configuration conf = new Configuration();
 81         conf.setFloat("beta", Float.parseFloat(args[3]));
 82         Job job = Job.getInstance(conf);
 83         job.setJarByClass(UnitMultiplication.class);
 84 
 85         ChainMapper.addMapper(job, TransitionMapper.class, Object.class, Text.class, Text.class, Text.class, conf);
 86         ChainMapper.addMapper(job, PRMapper.class, Object.class, Text.class, Text.class, Text.class, conf);
 87 
 88         job.setReducerClass(MultiplicationReducer.class);
 89 
 90         job.setOutputKeyClass(Text.class);
 91         job.setOutputValueClass(Text.class);
 92 
 93         MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, TransitionMapper.class);
 94         MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, PRMapper.class);
 95 
 96         FileOutputFormat.setOutputPath(job, new Path(args[2]));
 97         job.waitForCompletion(true);
 98     }
 99 
100 }
View Code

相关文章:

  • 2021-12-03
  • 2022-12-23
  • 2021-06-06
  • 2021-09-24
  • 2022-12-23
猜你喜欢
  • 2022-12-23
  • 2021-11-12
  • 2022-12-23
  • 2022-12-23
  • 2021-12-20
  • 2022-12-23
  • 2021-04-19
相关资源
相似解决方案