【发布时间】:2015-09-17 21:09:40
【问题描述】:
我正在我的小项目下工作,该项目使用图形作为主要结构。图由具有这种结构的顶点组成:
class SWVertex[T: ClassTag](
val id: Long,
val data: T,
var neighbors: Vector[Long] = Vector.empty[Long],
val timestamp: Timestamp = new Timestamp(System.currentTimeMillis())
) extends Serializable {
def addNeighbor(neighbor: Long): Unit = {
if (neighbor >= 0) { neighbors = neighbors :+ neighbor }
}
}
注意事项:
- 会有很多顶点,我想可能超过
MAX_INT。 - 每个顶点都有一个可变的邻居数组(它们只是另一个顶点的 ID)。
- 添加顶点到图中的特殊功能是使用 BFS 算法在图中选择最佳顶点来连接新顶点 - 修改现有的并添加顶点的邻居数组。
我决定使用 Apache Spark 和 Scala 来处理和浏览我的图表,但我遇到了一些误解:我知道,RDD 是一个并行数据集,我是使用 parallelize() 从主集合中创建的方法,我发现,修改源集合也会影响创建的 RDD。我用这段代码找到了答案:
val newVertex1 = new SWVertex[String](1, "test1")
val newVertex2 = new SWVertex[String](2, "test2")
var vertexData = Seq(newVertex1, newVertex2)
val testRDD1 = sc.parallelize(vertexData, vertexData.length)
testRDD1.collect().foreach(
f => println("| ID: " + f.id + ", data: " + f.data + ", neighbors: "
+ f.neighbors.mkString(", "))
)
// The result is:
// | ID: 1, data: test1, neighbors:
// | ID: 2, data: test2, neighbors:
// Calling simple procedure, that uses `addNeighbor` on both parameters
makeFriends(vertexData(0), vertexData(1))
testRDD1.collect().foreach(
f => println("| ID: " + f.id + ", data: " + f.data + ", neighbors: "
+ f.neighbors.mkString(", "))
)
// Now the result is:
// | ID: 1, data: test1, neighbors: 2
// | ID: 2, data: test2, neighbors: 1
,但我没有找到使用 RDD 方法制作相同内容的方法(老实说,由于 RDD 的不可变性,我什至不确定这是否可能)。在这种情况下,问题是:
有什么办法可以处理这么大的数据量,保持访问随机顶点以修改其邻居列表和不断追加新顶点的能力?
我认为解决方案必须是使用某种Vector 数据结构,在这种情况下我还有一个问题:
是否可以将 Scala 结构存储在集群内存中?
附注。我计划至少使用 Spark 来处理 BFS 搜索,但我很高兴听到任何其他建议。
附言。我读过.view 创建“惰性”集合转换的方法,但仍然不知道如何使用它...
更新 1:就我正在阅读 Scala Cookbook 而言,我认为选择 Vector 将是最佳选择,因为在我的情况下使用图形意味着大量随机访问顶点又名图的元素并附加新顶点,但仍然 - 我不确定将Vector 用于如此大量的顶点不会导致OutOfMemoryException
更新 2:在上面的测试中,我发现内存发生了一些有趣的事情。这是交易(请记住,我使用的是单节点 Spark 集群):
// Test were performed using these lines of code:
val runtime = Runtime.getRuntime
var usedMemory = runtime.totalMemory - runtime.freeMemory
// In the beginning of my work, before creating vertices and collection:
usedMemory = 191066456 bytes // ~182 MB, 1st run
usedMemory = 173991072 bytes // ~166 MB, 2nd run
// After creating collection with two vertices:
usedMemory = 191066456 bytes // ~182 MB, 1st run
usedMemory = 173991072 bytes // ~166 MB, 2nd run
// After creating testRDD1
usedMemory = 191066552 bytes // ~182 MB, 1st run
usedMemory = 173991168 bytes // ~166 MB, 2nd run
// After performing first testRDD1.collect() function
usedMemory = 212618296 bytes // ~203 MB, 1st run
usedMemory = 200733808 bytes // ~191 MB, 2nd run
// After calling makeFriends on source collection
usedMemory = 212618296 bytes // ~203 MB, 1st run
usedMemory = 200733808 bytes // ~191 MB, 2nd run
// After calling testRDD1.collect() for modified collection
usedMemory = 216645128 bytes // ~207 MB, 1st run
usedMemory = 203955264 bytes // ~195 MB, 2nd run
我知道这个测试量太少,无法确定我的结论,但我注意到:
- 创建集合时没有任何反应。
- 在此示例上创建 RDD 后,分配了 96 个字节,可能用于存储分区数据之类的。
- 当我调用
.collect()方法时分配了最多的内存,因为我基本上将所有数据收集到一个节点,并且可能因为单节点Spark安装,我得到了双份数据(不确定这里),它占用了大约 23 MB 的内存。 - 有趣的时刻发生在修改邻居的数组之后,这需要额外的 4 MB 内存来存储它们。
【问题讨论】:
-
你考虑过 GraphX 吗? spark.apache.org/docs/latest/graphx-programming-guide.html
-
@maasg 是的,我读过关于 GraphX 的文章,但我认为我不能在我的情况下使用这个解决方案,因为我需要能够更新
Graph中的顶点。据我所知,一旦创建Graph是完全不可变的,它不允许执行诸如附加新顶点之类的操作。还是我错了?
标签: scala collections graph apache-spark scala-collections