【问题标题】:Hadoop Map Reduce For Google web graphHadoop Map Reduce 用于 Google 网络图
【发布时间】:2013-12-25 15:25:10
【问题描述】:

我们被分配了创建 map reduce 函数的任务,该函数将为 google web 图表中的每个节点 n 输出您可以在 3 跳内从节点 n 到达的节点。 (实际数据可以在这里找到:http://snap.stanford.edu/data/web-Google.html) 这是列表中项目的示例:

1 2 
1 3 
2 4 
3 4 
3 5 
4 1 
4 5 
4 6 
5 6 

从上面的示例图表将是这个

在上面的简化示例中,节点 1 的示例路径是 α [1 -> 2 -> 4 -> 1], [1 -> 2 -> 4 -> 5], [1 -> 2 -> 4 -> 6], [1 -> 3 -> 4 -> 1], [1 -> 3 -> 4 -> 5], [1 -> 3 -> 4 -> 6] και [1 -> 3 -> 5 -> 6] 因此 map reduce 将为节点 1 输出顶点 1,5,6 ( (a) 每个顶点只能计算一次,并且 (b) 仅当存在长度为 3 的圆形路径时才包含当前顶点,例如 [1 -> 2 -> 4 -> 1] 和 [1 -> 3 -> 4 -> 1]。 /p>

我很迷茫,因为我认为这需要图论和算法的知识,而我们还没有学到任何与此相关的知识。

如果有人能给我一个正确的开始方向,我将不胜感激。 (我已经研究过最短路径理论等,但我不确定它是否对这个特定的练习有用)

提前致谢,祝您假期愉快。

编辑

我尝试创建附属列表,但是虽然我希望输出是“vertexID”“node1 node2 node3 node4...”,但我看到在输出文件中我的reducer将每个顶点ID的列表拆分为一对三。

例如,如果我有连接到 Z、X、C、V、B、N、M、G、H、J、K、L 的顶点 A,它会将其输出为

A Z,X,C

A V,B,N

A M,G,H

A J,K,L

下面是我的映射器和reducer

public class AdjacentsListDriver extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {



        Configuration conf = getConf();
        Job job = Job.getInstance(conf);
        job.setJobName("Test driver");
        job.setJarByClass(AdjacentsListDriver.class);

        String[] arg0 = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (arg0.length != 2) {
            System.err.println("Usage: hadoop jar <my_jar> <input_dir> <output_dir>");
            System.exit(1);
        }

        Path in = new Path(arg0[0]);
        Path out = new Path(arg0[1]);

        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);

        job.setMapperClass(ListMapper.class);
        job.setReducerClass(ListReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);    
        job.waitForCompletion(true);

        return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new AdjacentsListDriver(), args);
        System.exit(res);

    }



}





/**
 * @author George
 * Theoretically this takes a key(vertexID) and maps all nodes that are connected to it in one hop....
 *
 */
public class ListMapper extends Mapper<LongWritable, Text, Text, Text> {

    private Text vertexID = new Text();
    //private LongWritable vertice= new LongWritable(0);
    private Text vertice=new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();
        StringTokenizer itr = new StringTokenizer(line,"\n");
        StringTokenizer itrInside;

        //vertice=new LongWritable(Long.valueOf(value.toString()).longValue());


        while (itr.hasMoreTokens()) {
            if(itr.countTokens() > 2){

            }//ignore first line ??
            else{
                itrInside=new StringTokenizer(itr.toString());
                vertexID.set(itr.nextToken());

                while(itrInside.hasMoreTokens()){               
                    vertice.set(itrInside.nextToken());
                    context.write(vertexID, value);
                }           
            }
        }

    }

}

@override
public class ListReducer extends Reducer<Text, Text, Text, Text>{
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        String vertices="";

        for (Text value : values) {
            if(vertices=="")
                vertices+=value.toString();
            else
                vertices=vertices+","+value.toString();         
        }

        Text value=new Text();
        value.set(vertices);
        context.write(key, value);

    }

}

【问题讨论】:

    标签: java hadoop graph mapreduce


    【解决方案1】:

    由于这是您的(家庭作业)作业,因此我不会包含 Java/Hadoop 解决方案,但我会尝试让您更清楚地了解使用 MapReduce 进行图形计算的概念,以便您自己实现。

    对于每个顶点,您想要 恰好 n 跳的顶点。在查看最短路径算法时,您走在了正确的道路上,但是通过简单的广度优先搜索可以更轻松地解决它。

    但是,当使用 MapReduce 处理图形时,您需要更深入地研究顶点之间的消息传递。图算法通常用多个作业来表示,其中 map 和 reduce 阶段具有以下分配:

    • Mapper:向另一个顶点发送消息(通常针对顶点的每个邻居)
    • Reducer:对传入消息进行分组,加入核心图并减少它们,有时会发送更多消息。

    每个作业都将始终对前一个作业的输出进行操作,直到您达到结果或放弃为止。

    数据准备

    在您真正想要运行图形算法之前,请确保您的数据采用邻接列表的形式。它将使以下迭代算法更容易实现。

    因此,您需要按顶点 ID 对它们进行分组,而不是您的邻接元组。这是一些伪代码:

    map input tuple (X, Y): 
       emit (X, Y)
    
    reduce input (X, Y[]) :
      emit (X, Y[])
    

    基本上,您只是按顶点 ID 进行分组,因此您的输入数据是其邻居的键(顶点 ID)(可以从该特定键顶点 ID 访问的顶点 ID 列表)。如果想节省资源,可以使用reducer作为combiner。

    算法

    就像我已经提到的,您只需要广度优先搜索算法。您将为图中的每个顶点执行广度优先搜索算法,当遇到邻居时,您将只增加一个跳跃计数器,告诉我们距离起始顶点有多远(这是最短路径算法的最简单情况,即当边缘权重为 1)。

    让我给你看一张简单的图片,用一个简单的图表来描述它。橙色表示已访问,蓝色表示未访问,绿色是我们的结果。括号中是跳数计数器。

    你看,在每次迭代中,我们都会为每个顶点设置一个 hopcounter。如果我们碰到一个新的顶点,我们只会将它加一。如果我们命中第 n 个顶点,我们会以某种方式标记它以供以后查找。

    使用 MapReduce 分发

    虽然对每个顶点运行广度优先搜索确实很浪费,但我们可以通过并行化来做得更好。消息传递到这里。就像上图一样,我们在映射步骤中获得的每个顶点最初都会向其邻居发送一条消息,其中包含以下有效负载:

    HopMessage: Origin (VertexID) | HopCounter(Integer)
    

    在第一次迭代中,我们将尝试将消息发送给邻居以开始计算。否则,我们将只代理图表或传入消息。

    因此,在您准备好数据后的第一份工作中,您的 map 和 reduce 看起来像这样:

    map input (VertexID key, either HopMessage or List<VertexID> adjacents):
      if(iterationNumber == 1): // only in the first iteration to kick off
         for neighbour in adjacents:
            emit (neighbour, new HopMessage(key, 0))
      emit (key, adjacents or HopMessage) // for joining in the reducer
    

    reducer 现在在图和消息之间进行简单的连接,主要是为了获取顶点的邻居,从而导致输入(以我的简单图为例):

    1 2 // graph 
    2 1 // hop message
    2 3 // graph
    3 1 // hop message
    3 4 // graph
    4 1 // hop message
    4 - // graph
    

    在 reducer 步骤中,我们将再次将消息转发给邻居,并检查增加后的 hop counter 是否已经达到 3。

    reducer input(VertexID key, List<either HopMessage or List<VertexID> neighbours> values):
     for hopMessage in values:
        hopMessage.counter += 1
        if (hopMessage.counter == 3) 
           emit to some external resource (HopMessage.origin, key)
        else 
           for neighbour in neighbours of key:
              emit (neighbour, hopMessage)
        emit (key, neighbours)
    

    正如您所看到的,这里可能会变得非常混乱:您需要管理两种不同类型的消息,然后还要写入一些外部资源,以跟踪正好 3 跳外的顶点​​。

    只要有要发送的 HopMessage,您就可以安排迭代的作业。这很容易出现图表中的循环问题,因为在这种情况下您将无限增加 hopcounter。所以我建议要么用每条消息发送到目前为止的完整遍历路径(相当浪费),要么简单地限制要进行的迭代次数。在 n=3 的情况下,不需要超过 3 次作业迭代。

    有很多博客和项目将为您提供有关如何处理 Hadoop 中的每个问题的示例。至少我在我的博客中写过关于 MapReduce 中的图形处理的文章,你可以在我的 github 上找到一些示例。

    清理输出数据

    最后你会得到一堆包含顶点 -> 顶点映射的文件。您可以像在准备中那样减少它们。

    使用 Pregel 的更好方法

    处理图的一种不那么繁琐的方法是使用 Pregel 方式来表达图计算。 Pregel 正在使用以顶点为中心的模型,并且更容易表达这种广度优先的计算。

    这是使用Apache Hama的上述算法的一个简单示例:

      public static class NthHopVertex extends Vertex<Text, NullWritable, HopMessage> {
    
        @Override
        public void compute(Iterable<HopMessage> messages) throws IOException {
          if (getSuperstepCount() == 0L) {
            HopMessage msg = new HopMessage();
            msg.origin = getVertexID().toString();
            msg.hopCounter = 0;
            sendMessageToNeighbors(msg);
          } else {
            for (HopMessage message : messages) {
              message.hopCounter += 1;
              if (message.hopCounter == 3) {
                getPeer().write(new Text(message.origin), getVertexID());
                voteToHalt();
              } else {
                sendMessageToNeighbors(message);
              }
    
            }
          }
        }
      }
    

    顺便说一句,在您的示例中创建的新图表如下所示:

    1=[1, 5, 6]
    2=[2, 3, 6]
    3=[2, 3, 6]
    4=[4, 5]
    

    这是完整的 Hama Graph 实现:

    https://github.com/thomasjungblut/tjungblut-graph/blob/master/src/de/jungblut/graph/bsp/NthHop.java

    【讨论】:

    • 非常感谢您的回复:)。我会尽快调查,如果我再次遇到困难,我会在这里回复:)
    • 我已经开始使用代码,但我从 step1 遇到了一个小“问题”。我已经编辑了我的问题以包含新数据。我尝试创建附属列表,但事情并没有完全按照我的想法进行
    • @George 你有错误的通用声明。只需将@Override 注释添加到reduce 方法,这会导致编译器错误。你的扩展应该看起来像extends Reducer&lt;Text, Text, Text, Text&gt;
    • 感谢您的回复。我添加了@ovverride 注释,并将 IntWritable 替换为扩展中的文本。不幸的是,它仍然以与我上面提到的相同的方式输出。 (我认为这可能是因为我使用 Text 而不是 LonngWritable 作为键和值类型)
    • 我也想问一下“HopMessage 或 List”。我知道这是伪代码。 HopeMessage 和 VertexID 是需要创建的类。令人困惑的部分是“两者”。我不认为 java 有一个类可以做到这一点,所以我的猜测是我将不得不编写一个也决定该部分的类。或者有没有基于 hadoop 的框架提供这样的实现?
    猜你喜欢
    • 2012-07-07
    • 2023-03-19
    • 1970-01-01
    • 1970-01-01
    • 2011-07-21
    • 2014-03-22
    • 1970-01-01
    • 1970-01-01
    • 2016-10-25
    相关资源
    最近更新 更多