【问题标题】:Scala + Spark collections interactionsScala + Spark 集合交互
【发布时间】:2015-09-17 21:09:40
【问题描述】:

我正在我的小项目下工作,该项目使用图形作为主要结构。图由具有这种结构的顶点组成:

class SWVertex[T: ClassTag](
   val id: Long, 
   val data: T, 
   var neighbors: Vector[Long] = Vector.empty[Long], 
   val timestamp: Timestamp = new Timestamp(System.currentTimeMillis())
) extends Serializable { 
   def addNeighbor(neighbor: Long): Unit = {
      if (neighbor >= 0) { neighbors = neighbors :+ neighbor }
   }
}

注意事项

  1. 会有很多顶点,我想可能超过MAX_INT
  2. 每个顶点都有一个可变的邻居数组(它们只是另一个顶点的 ID)。
  3. 添加顶点到图中的特殊功能是使用 BFS 算法在图中选择最佳顶点来连接新顶点 - 修改现有的并添加顶点的邻居数组。

我决定使用 Apache Spark 和 Scala 来处理和浏览我的图表,但我遇到了一些误解:我知道,RDD 是一个并行数据集,我是使用 parallelize() 从主集合中创建的方法,我发现,修改源集合也会影响创建的 RDD。我用这段代码找到了答案:

val newVertex1 = new SWVertex[String](1, "test1")
val newVertex2 = new SWVertex[String](2, "test2")
var vertexData = Seq(newVertex1, newVertex2)

val testRDD1 = sc.parallelize(vertexData, vertexData.length)

testRDD1.collect().foreach(
   f => println("| ID: " + f.id + ", data: " + f.data + ", neighbors: "
   + f.neighbors.mkString(", "))
)

// The result is:
// | ID: 1, data: test1, neighbors: 
// | ID: 2, data: test2, neighbors: 


// Calling simple procedure, that uses `addNeighbor` on both parameters
makeFriends(vertexData(0), vertexData(1))

testRDD1.collect().foreach(
   f => println("| ID: " + f.id + ", data: " + f.data + ", neighbors: "
   + f.neighbors.mkString(", "))
)

// Now the result is:
// | ID: 1, data: test1, neighbors: 2
// | ID: 2, data: test2, neighbors: 1

,但我没有找到使用 RDD 方法制作相同内容的方法(老实说,由于 RDD 的不可变性,我什至不确定这是否可能)。在这种情况下,问题是:

有什么办法可以处理这么大的数据量,保持访问随机顶点以修改其邻居列表和不断追加新顶点的能力?

我认为解决方案必须是使用某种Vector 数据结构,在这种情况下我还有一个问题:

是否可以将 Scala 结构存储在集群内存中?

附注。我计划至少使用 Spark 来处理 BFS 搜索,但我很高兴听到任何其他建议。

附言。我读过.view 创建“惰性”集合转换的方法,但仍然不知道如何使用它...

更新 1:就我正在阅读 Scala Cookbook 而言,我认为选择 Vector 将是最佳选择,因为在我的情况下使用图形意味着大量随机访问顶点又名图的元素并附加新顶点,但仍然 - 我不确定将Vector 用于如此大量的顶点不会导致OutOfMemoryException

更新 2:在上面的测试中,我发现内存发生了一些有趣的事情。这是交易(请记住,我使用的是单节点 Spark 集群):

// Test were performed using these lines of code:
val runtime = Runtime.getRuntime
var usedMemory = runtime.totalMemory - runtime.freeMemory

// In the beginning of my work, before creating vertices and collection:
usedMemory = 191066456 bytes // ~182 MB, 1st run 
usedMemory = 173991072 bytes // ~166 MB, 2nd run
// After creating collection with two vertices:
usedMemory = 191066456 bytes // ~182 MB, 1st run
usedMemory = 173991072 bytes // ~166 MB, 2nd run
// After creating testRDD1
usedMemory = 191066552 bytes // ~182 MB, 1st run 
usedMemory = 173991168 bytes // ~166 MB, 2nd run
// After performing first testRDD1.collect() function
usedMemory = 212618296 bytes // ~203 MB, 1st run 
usedMemory = 200733808 bytes // ~191 MB, 2nd run
// After calling makeFriends on source collection
usedMemory = 212618296 bytes // ~203 MB, 1st run 
usedMemory = 200733808 bytes // ~191 MB, 2nd run
// After calling testRDD1.collect() for modified collection
usedMemory = 216645128 bytes // ~207 MB, 1st run 
usedMemory = 203955264 bytes // ~195 MB, 2nd run

我知道这个测试量太少,无法确定我的结论,但我注意到:

  1. 创建集合时没有任何反应。
  2. 在此示例上创建 RDD 后,分配了 96 个字节,可能用于存储分区数据之​​类的。
  3. 当我调用.collect()方法时分配了最多的内存,因为我基本上将所有数据收集到一个节点,并且可能因为单节点Spark安装,我得到了双份数据(不确定这里),它占用了大约 23 MB 的内存。
  4. 有趣的时刻发生在修改邻居的数组之后,这需要额外的 4 MB 内存来存储它们。

【问题讨论】:

  • @maasg 是的,我读过关于 GraphX 的文章,但我认为我不能在我的情况下使用这个解决方案,因为我需要能够更新 Graph 中的顶点。据我所知,一旦创建 Graph 是完全不可变的,它不允许执行诸如附加新顶点之类的操作。还是我错了?

标签: scala collections graph apache-spark scala-collections


【解决方案1】:

让我在这里尝试解决不同的问题:

RDD 是一个并行数据集,我从主集合中使用 parallelize() 方法,我发现,修改源 collection 也会对创建的 RDD 产生影响。

RDD 是并行的分布式 数据集。 parallelize 允许您获取本地集合并将其分发到集群上。您观察到的当前行为是,在对底层对象进行变异时,RDD 表示也会发生变异,这仅仅是因为程序当前在 1 个节点中运行。在集群中,这种行为不可能

不变性是“垂直”分布计算的关键:在同一处理器的多个内核上或“水平”分布:在集群中的多台机器上。

我没有找到使用 RDD更新图结构的方法 方法

要实现这一点,您需要根据分布式集合重新考虑图形结构。在当前的 OO 模型中,每个 Vertex 都包含自己的相邻顶点列表,并且需要对对象进行突变才能构建图。 我们需要通过仅使用其属性创建顶点并将关系外部化为边列表来使顶点不可变。简而言之,这就是GraphX 所做的。你的 Edge 看起来像:

case class Vertex[T: ClassTag](
   val id: Long, 
   val data: T, 
   val timestamp: Timestamp = new Timestamp(System.currentTimeMillis())
)

然后我们可以建立一个边的集合:

val Edges:RDD[(Long, Long)] // (Source Vertex Id, Dest Vertex Id)

那么,给定:

val usr1 = Vertex(1, "SuppieRK")
val usr2 = Vertex(2, "maasg")
val usr3 = Vertex(3, "graphy")
val usr4 = Vertex(4, "spark")

还有一些初始关系:

val edgeSeq = Seq((1,2), (2,3))

以及这种关系的RDD:

val relations = sparkContext.parallelize(edgeSeq)

那么添加新关系将意味着创建新边:

val newRelations = sparkContext.parallelize(Seq((1,4),(2,4),(3,4))

union-ing 这些集合在一起。

val allRel = relations.union(newRelations)

这就是“addFriend”的实现方式,但我们可能会从某个地方读取该数据。此方法不能用于对 Edges 集合进行逐一添加。您正在使用 Spark,因为要考虑的数据集非常大,并且您需要能够将计算分布在多台机器上。

如果集合适合一个节点,我会坚持使用“标准”Scala 表示和算法。

【讨论】:

  • 感谢您的回答,但我仍有一些问题要问:1. 我对您的EdgesVertices 有点迷失方向...你能解释一下为什么Edge 包含Vertex 属性吗? 2. 我得到了不变性的概念,它对分布式计算有好处,但我还有另一个问题:像uniondistinct 这样的操作实际上有多快?等方面的性能?内存使用情况? 3. 如果我尝试添加大量新顶点怎么办 - 我应该制作一种buffer 作为它们的临时存储,然后将这个buffer 合并到我的@ 987654339@?
  • @SuppieRK (1) 关于 Edge w/ Vertex 属性:抱歉,我把两者混为一谈(语言混乱)——我已经解决了。 (2) 性能:Spark 与传统 Hadoop 相比非常快,但会比 all in 1 机器慢。数据的序列化和任务的协调将付出代价。 (3) 从顶点中创建一个 RDD。最好的办法是从一些分布式 FS 中读取它们。如果它们是以编程方式创建的,您将被绑定到 1 台机器的可用内存 - 也有一些方法可以解决这个问题,例如将范围发送到集群节点并让每个节点创建这些对象。
  • 好的,我明白了,非常感谢您的解释!我会尝试为我挖掘更多所需的信息。
猜你喜欢
  • 2021-04-04
  • 2013-03-25
  • 2014-06-02
  • 2015-05-17
  • 1970-01-01
  • 2017-05-28
  • 2015-01-29
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多