【问题标题】:applying a function to graph data using mapReduceTriplets in spark and graphx在 spark 和 graphx 中使用 mapReduceTriplets 将函数应用于图形数据
【发布时间】:2015-05-27 11:20:23
【问题描述】:

我在使用 graphx 将 mapReduceTriplets 应用到我的图网络时遇到了一些问题。

我一直在关注教程并读入我自己的数据,这些数据被放在一起作为 [Array[String],Int],所以例如我的顶点是:

org.apache.spark.graphx.VertexRDD[Array[String]] 例如(3999,Array(17, Low, 9))

我的优势是:

org.apache.spark.graphx.EdgeRDD[Int] 例如边(3999,4500,1)

我正在尝试使用 mapReduceTriplets 应用聚合类型函数,该函数计算顶点数组中的最后一个整数(在上面的示例 9 中)与第一个整数(在上面的示例中)相同或不同的数量17) 所有连接的顶点。

因此,您最终会得到一个匹配或不匹配数量的计数列表。

我遇到的问题是使用 mapReduceTriplets 应用任何函数,我对 scala 很陌生,所以这可能非常明显,但在 graphx 教程中,它有一个示例使用格式为 Graph[Double, Int ],但是我的图表是 Graph[Array[String],Int] 的格式,所以我只是尝试作为第一步来弄清楚如何在示例中使用我的图表,然后从那里开始工作。

graphx网站上的例子如下:

    val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)](
  triplet => { // Map Function
    if (triplet.srcAttr > triplet.dstAttr) {
      // Send message to destination vertex containing counter and age
      Iterator((triplet.dstId, (1, triplet.srcAttr)))
    } else {
      // Don't send a message for this triplet
      Iterator.empty
    }
  },
  // Add counter and age
  (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)

任何建议都将不胜感激,或者如果您认为有比使用 mapreducetriplets 更好的方法,我会很高兴听到它。

修改了新代码

val nodes = (sc.textFile("C~nodeData.csv")
.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))

val edges = GraphLoader.edgeListFile(sc, "C:~edges.txt")


val graph = edges.outerJoinVertices(nodes) {
case (uid, deg, Some(attrList)) => attrList
case (uid, deg, None) => Array.empty[String]
}


val countsRdd = graph.collectNeighbors(EdgeDirection.Either).leftOuterJoin(graph.vertices).map {
  case (id, t) => {
    val neighbors: Array[(VertexId, Array[String])] = t._1
    val nodeAttr = (t._2)
    neighbors.map(_._2).count( x => x.apply(x.size - 1) == nodeAttr(0))

  }
}

【问题讨论】:

  • 什么版本的 Spark?在最新版本的 Spark 中,mapReduceTriplets 已替换为 aggregateMessages
  • 版本 1.3.0,aggregateMessages 是否以类似的方式工作?
  • 看看过渡指南:spark.apache.org/docs/latest/…

标签: scala network-programming apache-spark graph-algorithm spark-graphx


【解决方案1】:

我认为您想使用GraphOps.collectNeighbors 而不是mapReduceTripletsaggregateMessages

collectNeighbors 将为您提供一个 RDD,对于图中的每个 VertexId,将连接的节点作为一个数组。只需根据您的需要减少阵列。比如:

val countsRdd = graph.collectNeighbors(EdgeDirection.Either)
  .join(graph.vertices)
  .map{ case (vid,t) => {
    val neighbors = t._1
    val nodeAttr = t._2
    neighbors.map(_._2).filter( <add logic here> ).size
  }

如果这不能让你朝着正确的方向前进,或者你被卡住了,请告诉我(例如“”部分)。

【讨论】:

  • 谢谢大卫,这看起来很有帮助,我会试一试,如果我遇到任何问题,请告诉你。
  • 如果它对你有用,记得回来接受我的回答和/或投票! :)
  • 再次感谢您的帮助,尽管我在编写逻辑参数时遇到了一些问题,并且可能走错了路。我在想什么(我现在知道这是错误的)在下面,其中 x 是从邻居获取的 Array[String] (因此包含在源节点中的信息)。我认为这只是比较数组,而不是其中的组件,尽管我似乎无法访问单个组件元素?可能是完全错误的... neighbors.map(_._2).filter(x=> x== nodeAttr)
  • 正如您所指出的,您正在尝试比较整个数组。对于您想要的逻辑,您需要使用x.apply(x.size - 1) == nodeAttr(0) 之类的东西选择最后一个值这会将x 的最后一个元素与nodeAttr 的第一个元素进行比较
  • 再次感谢您的建议,我似乎一直遇到 nodeAttr(0) 的问题,但我收到错误消息 Option[Array[String]] does not take parameters, for some reason我似乎无法访问数组的任何单个组件。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-10-26
  • 2016-05-20
  • 1970-01-01
  • 1970-01-01
  • 2022-06-10
  • 2018-09-28
  • 1970-01-01
相关资源
最近更新 更多