【发布时间】:2015-12-09 20:41:31
【问题描述】:
我对 Spark 和 Scala 比较陌生...我有一个图:Graph[Int, String],我想将我在 DataFrame 中的一些属性附加到这些顶点。
我需要做的是,对于每个顶点,找到每个属性在邻域中的平均值。到目前为止,这是我的方法,但我不明白如何正确映射从两个数据框的连接中获得的 Row:
val res = graph.collectNeighbors(EdgeDirection.Either)
.toDF("ID", "neighbours")
.join(aDataFrameWithProperties, "ID")
.map{x => // this is where I am lost
}
我认为我的方法不正确,因为我将每个顶点的属性与其邻居的数组连接起来,但我仍然不知道邻居的属性值...
编辑
一些有助于理解我想要完成的数据...假设您按照how to create EdgeRDD from data frame in Spark 的答案构建图表
val sqlc : SQLContext = ???
case class Person(id: Long, country: String, age: Int)
val testPeople = Seq(
Person(1, "Romania" , 15),
Person(2, "New Zealand", 30),
Person(3, "Romania" , 17),
Person(4, "Iceland" , 20),
Person(5, "Romania" , 40),
Person(6, "Romania" , 44),
Person(7, "Romania" , 45),
Person(8, "Iceland" , 21),
Person(9, "Iceland" , 22)
)
val people = sqlc.createDataFrame(testPeople)
val peopleR = people
.withColumnRenamed("id" , "idR")
.withColumnRenamed("country", "countryR")
.withColumnRenamed("age" , "ageR")
import org.apache.spark.sql.functions._
val relations = people.join(peopleR,
(people("id") < peopleR("idR")) &&
(people("country") === peopleR("countryR")) &&
(abs(people("age") - peopleR("ageR")) < 5))
import org.apache.spark.graphx._
val edges = EdgeRDD.fromEdges(relations.map(row => Edge(
row.getAs[Long]("id"), row.getAs[Long]("idR"), ())))
val users = VertexRDD.apply(people.map(row => (row.getAs[Int]("id").toLong, row.getAs[Int]("id").toInt)))
val graph = Graph(users, edges)
然后你有一个像这样的数据框:
case class Person(id:Long, gender:Int, income:Int)
val properties = Seq(
Person(1, 0, 321),
Person(2, 1, 212),
Person(3, 0, 212),
Person(4, 0, 122),
Person(5, 1, 898),
Person(6, 1, 212),
Person(7, 1, 22),
Person(8, 0, 8),
Person(9, 0, 212)
)
val people = sqlc.createDataFrame(properties)
我想计算每个顶点的平均性别是多少,邻居的平均收入是多少,以 DataFrame 形式返回
【问题讨论】:
-
为什么要将属性与图表分开?这里的预期输出到底是什么?
-
当我构建图表时,我有节点但我还没有属性...我可以“附加”属性吗?预期的结果是一个带有 ID 列和每个属性的一列的 DataFrame
-
所以您不想要图表?这听起来像是
aggregateMessages而不是 DataFrames 的工作。无论如何,你能显示一些数据吗? -
不,我不想返回图表,如果这是你的意思...我会在编辑中放入一些数据
标签: scala apache-spark spark-graphx