问题导读

1.GraphX提供了几种方式从RDD或者磁盘上的顶点和边集合构造图?
2.PageRank算法在图中发挥什么作用?
3.三角形计数算法的作用是什么?


  • class GraphOps[VD, ED] {
  •   def pregel[A]
  •       (initialMsg: A,
  •        maxIter: Int = Int.MaxValue,
  •        activeDir: EdgeDirection = EdgeDirection.Out)
  •       (vprog: (VertexId, VD, A) => VD,
  •        sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
  •        mergeMsg: (A, A) => A)
  •     : Graph[VD, ED] = {
  •     // Receive the initial message at each vertex
  •     var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
  •     // compute the messages
  •     var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
  •     var activeMessages = messages.count()
  •     // Loop until no messages remain or maxIterations is achieved
  •     var i = 0
  •     while (activeMessages > 0 && i < maxIterations) {
  •       // Receive the messages: -----------------------------------------------------------------------
  •       // Run the vertex program on all vertices that receive messages
  •       val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
  •       // Merge the new vertex values back into the graph
  •       g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
  •       // Send Messages: ------------------------------------------------------------------------------
  •       // Vertices that didn't receive a message above don't appear in newVerts and therefore don't
  •       // get to send messages.  More precisely the map phase of mapReduceTriplets is only invoked
  •       // on edges in the activeDir of vertices in newVerts
  •       messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
  •       activeMessages = messages.count()
  •       i += 1
  •     }
  •     g
  •   }
  • }
  • 复制代码

    相关文章: