【问题标题】:how to attach properties to vertices in a graphx and retrieve the neighbourhood如何将属性附加到graphx中的顶点并检索邻域
【发布时间】:2015-12-09 20:41:31
【问题描述】:

我对 Spark 和 Scala 比较陌生...我有一个图:Graph[Int, String],我想将我在 DataFrame 中的一些属性附加到这些顶点。

我需要做的是,对于每个顶点,找到每个属性在邻域中的平均值。到目前为止,这是我的方法,但我不明白如何正确映射从两个数据框的连接中获得的 Row:

val res = graph.collectNeighbors(EdgeDirection.Either)
         .toDF("ID", "neighbours")
         .join(aDataFrameWithProperties, "ID")
         .map{x => // this is where I am lost
         }

我认为我的方法不正确,因为我将每个顶点的属性与其邻居的数组连接起来,但我仍然不知道邻居的属性值...

编辑

一些有助于理解我想要完成的数据...假设您按照how to create EdgeRDD from data frame in Spark 的答案构建图表

val sqlc : SQLContext = ???

case class Person(id: Long, country: String, age: Int)

val testPeople = Seq(
   Person(1, "Romania"    , 15),
   Person(2, "New Zealand", 30),
   Person(3, "Romania"    , 17),
   Person(4, "Iceland"    , 20),
   Person(5, "Romania"    , 40),
   Person(6, "Romania"    , 44),
   Person(7, "Romania"    , 45),
   Person(8, "Iceland"    , 21),
   Person(9, "Iceland"    , 22)
 )

 val people = sqlc.createDataFrame(testPeople)
 val peopleR = people
   .withColumnRenamed("id"     , "idR")
   .withColumnRenamed("country", "countryR")
   .withColumnRenamed("age"    , "ageR")

 import org.apache.spark.sql.functions._

 val relations = people.join(peopleR,
       (people("id") < peopleR("idR")) &&
         (people("country") === peopleR("countryR")) &&
         (abs(people("age") - peopleR("ageR")) < 5))

 import org.apache.spark.graphx._

 val edges = EdgeRDD.fromEdges(relations.map(row => Edge(
       row.getAs[Long]("id"), row.getAs[Long]("idR"), ())))

 val users = VertexRDD.apply(people.map(row => (row.getAs[Int]("id").toLong, row.getAs[Int]("id").toInt)))

 val graph = Graph(users, edges)

然后你有一个像这样的数据框:

case class Person(id:Long, gender:Int, income:Int)
val properties = Seq(
  Person(1, 0, 321),
  Person(2, 1, 212),
  Person(3, 0, 212),
  Person(4, 0, 122),
  Person(5, 1, 898),
  Person(6, 1, 212),
  Person(7, 1, 22),
  Person(8, 0, 8),
  Person(9, 0, 212)
)

val people = sqlc.createDataFrame(properties)

我想计算每个顶点的平均性别是多少,邻居的平均收入是多少,以 DataFrame 形式返回

【问题讨论】:

  • 为什么要将属性与图表分开?这里的预期输出到底是什么?
  • 当我构建图表时,我有节点但我还没有属性...我可以“附加”属性吗?预期的结果是一个带有 ID 列和每个属性的一列的 DataFrame
  • 所以您不想要图表?这听起来像是 aggregateMessages 而不是 DataFrames 的工作。无论如何,你能显示一些数据吗?
  • 不,我不想返回图表,如果这是你的意思...我会在编辑中放入一些数据

标签: scala apache-spark spark-graphx


【解决方案1】:

一般来说,您应该使用图形运算符而不是将所有内容都转换为 DataFrame,但这样的事情应该可以解决问题:

import org.apache.spark.sql.functions.{explode, avg}

val statsDF = graph.collectNeighbors(EdgeDirection.Either)
  .toDF("ID", "neighbours")
  // Flatten neighbours column
  .withColumn("neighbour", explode($"neighbours"))
  // and extract neighbour id
  .select($"ID".alias("this_id"), $"neighbour._1".alias("other_id"))
  // join with people 
  .join(people, people("ID") === $"other_id")
  .groupBy($"this_id")
  .agg(avg($"gender"), avg($"income"))

如果我想计算性别 = 我自己的性别的邻居数量而不是平均值,然后找到所有连接的平均值

为此,您需要两个单独的连接 - 一个在 this_id 上,一个在 ohter_id 上。接下来,您可以简单地使用以下表达式进行聚合:

avg((this_gender === other_gender).cast("integer"))

关于图形运算符,您可以使用一些操作。对于初学者,您可以使用连接操作向顶点添加属性:

val properties: RDD[(VertexId, (Int, Int))] = sc.parallelize(Seq(
  (1L, (0, 321)), (2L, (1, 212)), (3L, (0, 212)),
  (4L, (0, 122)), (5L, (1, 898)), (6L, (1, 212)),
  (7L, (1, 22)), (8L, (0, 8)), (9L, (0, 212))
))

val graphWithProperties = graph
  .outerJoinVertices(properties)((_, _, prop) => prop)
  // For simplicity this assumes no missing values 
  .mapVertices((_, props) => props.get) 

接下来我们可以聚合消息来创建新的VertexRDD

val neighboursAggregated = graphWithProperties
  .aggregateMessages[(Int, (Int, Int))](
    triplet => {
      triplet.sendToDst(1, triplet.srcAttr)
      triplet.sendToSrc(1, triplet.dstAttr)
    },
    {case ((cnt1, (age1, inc1)), (cnt2, (age2, inc2))) =>
      (cnt1 + cnt2, (age1 + age2, inc1 + inc2))}
  )

最后我们可以替换现有的属性:

graphWithProperties.outerJoinVertices(neighboursAggregated)(
  (_, oldProps, newProps) => newProps match {
    case Some((cnt, (gender, inc))) => Some(
      if (oldProps._1 == 1) gender.toDouble / cnt
      else  1 - gender.toDouble / cnt,
      inc.toDouble / cnt
    )
    case _ => None
  })

如果您只对值感兴趣,您可以在 aggregateMessages 中传递所有必需的值并省略第二个 outerJoinVertices

【讨论】:

  • 我唯一的问题是了解如何从数据框而不是 rdd 开始添加属性...我可以将数据框映射到 VertexRDD[Long, Row] 但是你的代码休息...
  • 您能详细说明一下吗?我遇到的一个问题是您示例中的users RDD 不正确。
  • 没什么好说的,抱歉……你写了“aggregateMessgenders”,我复制并粘贴了它……:)
  • 好的,所以我在这方面做了很多尝试,但惨遭失败......我不明白 aggregateMessage 的事情......我没有像 (age2, inc2) 这样的两个值,但是在我的数据框中还有更多...我可以在这里传递的是整行,但是聚合消息不起作用...您可以在代码中添加一些 cmets 以便我更好地了解聚合消息的工作原理吗?跨度>
  • 我已经发布了完整的说明如何附加其他属性here
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-05-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多