【问题标题】:Reduce computational time in Spark application减少 Spark 应用程序中的计算时间
【发布时间】:2016-06-22 07:07:27
【问题描述】:

我有一个迭代运行超过 500 万个元素的 Spark 应用程序。 该应用程序需要 2 小时才能在整个数据集上运行。但我必须在超过 5000 万个元素的整个数据集上运行应用程序。
代码运行成功,但问题是我的大部分程序都在驱动程序上运行,而执行程序在运行应用程序时发挥的作用很小。 因此,这个迭代应用程序的计算时间非常大。
该应用程序通过从 n-triples 数据集构建图形来查找连接的组件。
问题是executor没有接收任务,第一个for循环一直运行到500万个元素全部完成,所以这部分需要大约90%的时间,所以我主要需要优化这部分。
建议更改以将工作从驱动程序转移到执行程序,从而使此代码具有可扩展性,从而显着减少计算时间。

import scala.io.Source 
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ListBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.ArrayBuffer

object Wisdom {

val componentLists = HashMap[VertexId, ListBuffer[VertexId]]()
val prefLabelMap =  HashMap[VertexId, String]()

def main(args: Array[String]) {

val conf = new SparkConf()
val sc = new SparkContext(conf)

val tripleEndingPattern = """\s*\.\s*$""".r    
val languageTagPattern = "@[\\w-]+".r    

var edgeArray = Array(Edge(0L,0L,"http://dummy/URI"))
var literalPropsTriplesArray = new Array[(Long,Long,String)](0)
var vertexArray = new Array[(Long,String)](0)

val source = sc.textFile("hdfs://ec2-54-172-85-190.compute-1.amazonaws.com:54310/akshat/datas.nt")
val lines = source.toArray

var vertexURIMap = new HashMap[String, Long];

var triple = new Array[String](3)
var nextVertexNum = 0L
for (i <- 0 until lines.length) {

    lines(i) = tripleEndingPattern.replaceFirstIn(lines(i)," ")  
    triple = lines(i).mkString.split(">\\s+")       
    val tripleSubject = triple(0).substring(1)   
    val triplePredicate = triple(1).substring(1) 
    if (!(vertexURIMap.contains(tripleSubject))) {
        vertexURIMap(tripleSubject) = nextVertexNum
        nextVertexNum += 1
    }
    if (!(vertexURIMap.contains(triplePredicate))) {
        vertexURIMap(triplePredicate) = nextVertexNum
        nextVertexNum += 1
    }
    val subjectVertexNumber = vertexURIMap(tripleSubject)
    val predicateVertexNumber = vertexURIMap(triplePredicate)

    if (triple(2)(0) == '<') { 
        val tripleObject = triple(2).substring(1)   
        if (!(vertexURIMap.contains(tripleObject))) {
            vertexURIMap(tripleObject) = nextVertexNum
            nextVertexNum += 1
        }
        val objectVertexNumber = vertexURIMap(tripleObject)
        edgeArray = edgeArray :+
            Edge(subjectVertexNumber,objectVertexNumber,triplePredicate)
    }
    else {
        literalPropsTriplesArray = literalPropsTriplesArray :+
            (subjectVertexNumber,predicateVertexNumber,triple(2))
    }
}

for ((k, v) <- vertexURIMap) vertexArray = vertexArray :+  (v, k)  

for (i <- 0 until literalPropsTriplesArray.length) {
    if (literalPropsTriplesArray(i)._2 ==
        vertexURIMap("http://www.w3.org/2000/01/rdf-schema#label")) {

        val prefLabel =
            languageTagPattern.replaceFirstIn(literalPropsTriplesArray(i)._3,"")
        prefLabelMap(literalPropsTriplesArray(i)._1) = prefLabel;
    }
}

val vertexRDD: RDD[(Long, String)] = sc.parallelize(vertexArray)

val edgeRDD: RDD[Edge[(String)]] =
    sc.parallelize(edgeArray.slice(1,edgeArray.length))

val literalPropsTriplesRDD: RDD[(Long,Long,String)] =
    sc.parallelize(literalPropsTriplesArray)

val graph: Graph[String, String] = Graph(vertexRDD, edgeRDD)

val skosRelatedSubgraph =
    graph.subgraph(t => t.attr ==
                   "http://purl.org/dc/terms/subject")

val ccGraph = skosRelatedSubgraph.connectedComponents()

ccGraph.vertices.saveAsTextFile("hdfs://ec2-54-172-85-190.compute-1.amazonaws.com/akshat/outp")

sc.stop
}
}

【问题讨论】:

  • 如果你放一些 cmets 或者至少放一个总结告诉你的代码在做什么,那就太好了。它会让其他人轻松阅读它
  • 分析最慢的部分并指出来,然后可能有人可以提供帮助
  • 我同意@PreetiKhurana。此外,此代码未正确使用 spark,您正在读取带有 spark 上下文的文本文件以在之后收集它。我投票结束这个问题,因为它太宽泛而无法回答,而且不清楚你在问什么,等等。

标签: scala apache-spark distributed-computing connected-components


【解决方案1】:

您正在使用 for 循环,您必须使用带有 RDD 的地图(代码中的源代码)。然后执行者进入画面,任务将被共享。 Spark 并非旨在使用 for 循环来遍历输入文件中的行。请做好你的基础。快乐学习

【讨论】:

    猜你喜欢
    • 2020-12-06
    • 2018-07-12
    • 2018-03-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-03-21
    • 1970-01-01
    • 2011-05-12
    相关资源
    最近更新 更多