【问题标题】:Shortest path performance in Graphx with Spark使用 Spark 在 Graphx 中的最短路径性能
【发布时间】:2017-05-25 06:37:05
【问题描述】:

我正在从edgevertices 类型的gz 压缩json 文件创建一个图表。

我已将文件放入保管箱文件夹here

我加载并映射这些json 记录以创建graphx 所需的verticesedge 类型,如下所示:

val vertices_raw = sqlContext.read.json("path/vertices.json.gz")
val vertices = vertices_raw.rdd.map(row=> ((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[Long]("index")))
val verticesRDD: RDD[(VertexId, Long)] = vertices
val edges_raw = sqlContext.read.json("path/edges.json.gz")
val edgesRDD = edges_raw.rdd.map(row=>(Edge(row.getAs[String]("positiveNode").stripPrefix("osgb").toLong, row.getAs[String]("negativeNode").stripPrefix("osgb").toLong, row.getAs[Double]("length"))))
val my_graph: Graph[(Long),Double] = Graph.apply(verticesRDD, edgesRDD).partitionBy(PartitionStrategy.RandomVertexCut)

然后我使用我发现的这个dijkstra 实现来计算两个顶点之间的最短路径:

def dijkstra[VD](g: Graph[VD, Double], origin: VertexId) = {
          var g2 = g.mapVertices(
        (vid, vd) => (false, if (vid == origin) 0 else Double.MaxValue, List[VertexId]())
          )
          for (i <- 1L to g.vertices.count - 1) {
            val currentVertexId: VertexId = g2.vertices.filter(!_._2._1)
              .fold((0L, (false, Double.MaxValue, List[VertexId]())))(
                (a, b) => if (a._2._2 < b._2._2) a else b)
              ._1

            val newDistances: VertexRDD[(Double, List[VertexId])] =
              g2.aggregateMessages[(Double, List[VertexId])](
            ctx => if (ctx.srcId == currentVertexId) {
              ctx.sendToDst((ctx.srcAttr._2 + ctx.attr, ctx.srcAttr._3 :+ ctx.srcId))
            },
            (a, b) => if (a._1 < b._1) a else b
          )
        g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => {
          val newSumVal = newSum.getOrElse((Double.MaxValue, List[VertexId]()))
          (
            vd._1 || vid == currentVertexId,
            math.min(vd._2, newSumVal._1),
            if (vd._2 < newSumVal._1) vd._3 else newSumVal._2
            )
        })
        }

          g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
        (vd, dist.getOrElse((false, Double.MaxValue, List[VertexId]()))
          .productIterator.toList.tail
          ))
        }

我取两个随机顶点 id:

val v1 = 4000000028222916L
val v2 = 4000000031019012L

并计算它们之间的路径:

val results = dijkstra(my_graph, v1).vertices.map(_._2).collect

如果没有出现 stackoverflow 错误,我无法在我的笔记本电脑上进行本地计算。我可以看到它使用了 4 个可用内核中的 3 个。我可以在完全相同的图上加载此图并使用 Python 中的 igraph 库每秒计算最短 10 条路径。这是计算路径的低效方法吗?在规模上,将在多个节点上计算路径(没有 stackoverflow 错误),但每个路径计算仍然需要 30/40 秒。

【问题讨论】:

    标签: json apache-spark spark-graphx


    【解决方案1】:

    正如您在python-igraph github 上看到的那样

    “它旨在尽可能强大(即快速)以启用 大图分析。”

    为了解释为什么在 apache-spark 上花费的时间比在本地 python 上多 4000 倍,您可以查看 here(与 Spark PMC 成员 Kay Ousterhout 一起深入研究性能瓶颈。)以了解它可能是由于瓶颈:

    ...从网络和磁盘 I/O 是主要瓶颈的想法开始... 您可能不需要将数据存储在内存中,因为作业可能不会那么快。这就是说,如果您将序列化的压缩数据从磁盘移动到内存中......

    您可能还会看到herehere 一些信息,但最好的最终方法是对您的代码进行基准测试以了解瓶颈在哪里

    【讨论】:

      猜你喜欢
      • 2021-12-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多