【问题标题】:How to compute edges between nodes v, w that are pointed to by the same node x如何计算同一节点 x 指向的节点 v、w 之间的边
【发布时间】:2015-05-19 12:25:17
【问题描述】:

这个问题是关于 Spark GraphX。给定一个任意图,我想计算一个新图,它在任意两个节点 v、w 之间添加边,这两个节点都由某个节点 x 指向。新边应包含指向节点作为属性。

即给定边 (x, v, nil) 和 (x, w, nil) 计算边 (v, w, x) 和 (w, v, x)。

它应该适用于任何图形,并且不需要我事先了解有关图形的任何信息,例如顶点 ID。

示例

[任务] 当被同一节点(例如 B)指向时,在节点(例如 A、C)之间添加两条有向边。

输入图:

          ┌────┐
    ┌─────│ B  │──────┐
    │     └────┘      │
    v                 v
 ┌────┐            ┌────┐
 │ A  │            │ C  │
 └────┘            └────┘
    ^                 ^
    │     ┌────┐      │
    └─────│ D  │──────┘
          └────┘

输出图(双向边 = 两条有向边):

          ┌────┐
    ┌─────│ B  │──────┐
    │     └────┘      │
    v                 v
 ┌────┐<───by B───>┌────┐
 │ A  │            │ C  │
 └────┘<───by D───>└────┘
    ^                 ^
    │     ┌────┐      │
    └─────│ D  │──────┘
          └────┘

如何优雅地编写返回输出图的 GraphX 查询?

【问题讨论】:

  • 您在输出图中的双箭头边没有意义。边有一个 src 和一个 dest - 在输出图中哪个是?
  • @DavidGriffin:您应该将其解读为两个有向边。我现在要更新一下这个问题。
  • 顺便说一句,我目前正在研究解决方案的 Pregel 版本。很高兴得到您的反馈。
  • 无意冒犯,但这不是很优雅!你需要它有多普遍?因为您可以只运行graph.edges.flatMap 并为每个边缘创建一个基于它的新边缘。这就是你所做的一切。
  • @DavidGriffin:也许我问的问题不够清楚。我(显然?)正在寻找一个通用解决方案,而不是该特定示例图的解决方案......换句话说,代码不应该要求我知道顶点 ID。我很遗憾没有在问题中写出来。

标签: scala graph spark-graphx


【解决方案1】:

这是一个使用预凝胶和聚合消息的解决方案

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

// Step 0: Create an input graph.
val nodes =
  sc.parallelize(Array(
    (101L, "A"), (102L, "A"), (201L, "B"), (202L, "B")
  ))
val edges = 
  sc.parallelize(Array(
    Edge(201L, 101L, ("B-to-A", 0L)), Edge(201L, 102L, ("B-to-A", 0L)),
    Edge(202L, 101L, ("B-to-A", 0L)), Edge(202L, 102L, ("B-to-A", 0L))
  ))    
val graph = Graph(nodes, edges, "default")

// Step 1: Transform input graph before running pregel.
val initialGraph = graph.mapVertices((id, _) => Set[(VertexId,VertexId)]())

// Step 2: Send downstream vertex IDs (A's) to upstream vertices (B's)
val round1 = initialGraph.pregel(
  initialMsg=Set[(VertexId,VertexId)](), 
  maxIterations=1, 
  activeDirection=EdgeDirection.In) 
(
  (id, old, msg) => old.union(msg),
  triplet => Iterator((triplet.srcId, Set((triplet.dstId, triplet.srcId)))),
  (a,b) => a.union(b)
)

// Step 3: Send (gathered) IDs back to downstream vertices
val round2 = round1.aggregateMessages[Set[(VertexId,VertexId)]](
  triplet => {
    triplet.sendToDst(triplet.srcAttr)
  },
  (a, b) => a.union(b)
)

// Step 4: Transform vertices to edges
val newEdges = round2.flatMap {v => v._2.filter(w => w._1 != v._1).map(w => Edge(v._1, w._1, ("shares-with", w._2)))}

// Step 5: Create a new graph that contains new edges
val newGraph = Graph(graph.vertices, graph.edges.union(newEdges))

// Step 6: print graph to verify result
newGraph.triplets foreach println

此解决方案使用三个主要步骤来计算具有新边的图:1) 一轮预凝胶。 2) 一轮聚合消息。 3) 一轮映射节点到边。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-02-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-09-17
    • 1970-01-01
    相关资源
    最近更新 更多