【问题标题】:Creating array per Executor in Spark and combine into RDD在 Spark 中为每个 Executor 创建数组并合并到 RDD
【发布时间】:2016-03-21 16:16:47
【问题描述】:

我正在从基于 MPI 的系统迁移到 Apache Spark。我需要在 Spark 中执行以下操作。

假设,我有n 顶点。我想从这些n 顶点创建一个边列表。一条边只是两个整数 (u,v) 的元组,不需要任何属性。

但是,我想在每个执行程序中独立地并行创建它们。因此,我想为P Spark Executors 独立创建P 边缘数组。每个数组可能有不同的大小并且取决于顶点,因此,我还需要从0n-1 的执行程序ID。接下来,我想要一个全局 RDD 边数组。

在 MPI 中,我会使用处理器等级在每个处理器中创建一个数组。如何在 Spark 中做到这一点,尤其是使用 GraphX 库?

因此,我的主要目标是在每个执行器中创建一个边数组,并将它们组合成一个 RDD。

我首先尝试了鄂尔多斯的一个修改版本——人义模型。作为参数,我只有节点数 n 和概率 p。

假设,执行器i 必须处理从101200 的节点。对于任何节点说,节点101,它将以概率p创建从101102 -- n的边。在每个执行程序创建分配的边之后,我将实例化 GraphX EdgeRDDVertexRDD。因此,我的计划是在每个executor中独立创建边缘列表,并将它们合并到RDD中。

【问题讨论】:

  • 我对MPI一无所知,但从你的描述中我可以看出你对问题的思考方式太“低级”了。在 Spark 中,您不必担心哪个 executor 存储了哪个数组。只需创建您的 RDD,Spark 就会自动处理数据的分发和处理。我还建议您阅读GraphX 的文档,因为顶点和边需要以某种方式定义才能在GraphX 中使用。
  • 感谢您的建议。我正在尝试并行实现图形生成器。生成器必须以这样一种方式创建边,以使计算负载得到很好的平衡。

标签: scala apache-spark spark-graphx


【解决方案1】:

让我们从下游处理所需的一些导入和变量开始:

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.util.Random
import org.apache.spark.HashPartitioner

val nPartitions: Integer = ???
val n: Long = ??? 
val p: Double = ???

接下来,我们需要一个可用于生成边的种子 ID 的 RDD。处理这个问题的一种简单的方法是这样的:

sc.parallelize(0L to n)

由于生成的边数取决于节点 ID,这种方法会产生高度倾斜的负载。我们可以通过重新分区做得更好:

sc.parallelize(0L to n)
  .map((_, None))
  .partitionBy(new HashPartitioner(nPartitions))
  .keys

但更好的方法是从空 RDD 开始并在适当位置生成 id。我们需要一个小帮手:

def genNodeIds(nPartitions: Int, n: Long)(i: Int) = {
  (0L until n).filter(_ % nPartitions == i).toIterator
}

可以这样使用:

val empty = sc.parallelize(Seq.empty[Int], nPartitions)
val ids = empty.mapPartitionsWithIndex((i, _) => genNodeIds(nPartitions, n)(i))

只是一个快速的健全性检查(它非常昂贵,所以不要在生产中使用它):

require(ids.distinct.count == n) 

我们可以使用另一个助手生成实际的边:

def genEdgesForId(p: Double, n: Long, random: Random)(i: Long) = {
  (i + 1 until n).filter(_ => random.nextDouble < p).map(j => Edge(i, j, ()))
}

def genEdgesForPartition(iter: Iterator[Long]) = {
  // It could be an overkill but better safe than sorry
  // Depending on your requirement it could worth to
  // consider using commons-math
  // https://commons.apache.org/proper/commons-math/userguide/random.html
  val random = new Random(new java.security.SecureRandom())
  iter.flatMap(genEdgesForId(p, n, random))
}

val edges = ids.mapPartitions(genEdgesForPartition)

最后我们可以创建一个图表:

val graph = Graph.fromEdges(edges, ())

【讨论】:

  • 非常感谢,这是非常详尽的指南。这对我理解 Spark 也很有帮助。
  • 我已经简化了一点。它还应该解决您在另一个问题中提到的问题。
猜你喜欢
  • 2023-03-18
  • 2015-10-18
  • 1970-01-01
  • 1970-01-01
  • 2016-12-25
  • 2015-07-05
  • 2017-06-22
  • 1970-01-01
  • 2012-09-11
相关资源
最近更新 更多