【问题标题】:Spark - Simple GraphX program taking long time to completeSpark - 简单的 GraphX 程序需要很长时间才能完成
【发布时间】:2017-08-24 16:09:08
【问题描述】:

我有一个 9 节点 m3.xlarge (8 cpu / 15 gig) EMR 集群,其中 1 个节点是主节点,其他 8 个节点是从节点。我正在尝试运行一个简单的程序来检查 GraphX 连接的组件。这是我的代码:

def main(args : Array[String]): Unit = {

    val sparkConfig = new SparkConf()
      .set("hive.exec.dynamic.partition", "true")
      .set("hive.exec.dynamic.partition.mode", "nonstrict")
      .set("hive.s3.max-client-retries", "50")
      .set("hive.s3.max-error-retries", "50")
      .set("hive.s3.max-connections", "100")
      .set("hive.s3.connect-timeout", "5m")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryo.registrationRequired", "true")
      .set("spark.kryo.classesToRegister", "org.apache.spark.graphx.impl.VertexAttributeBlock")
      .set("spark.broadcast.compress", "true")
      .set("spark.default.parallelism", "24")

     val spark = SparkSession.builder()
        .appName("Spark Hive Example")
        .enableHiveSupport()
        .config(sparkConfig)
        .getOrCreate()

    // Set Kryo for serializing
    GraphXUtils.registerKryoClasses(sparkConfig)
    val res = spark.sql("SELECT col1, col2, col3 FROM table1 limit 10000")
    val edgesRDD = res.rdd.map(row => Edge(row.getString(0).hashCode, row.getString(1).hashCode, row(2).asInstanceOf[String]))

    val res_two = spark.sql("SELECT col1 FROM table2 where col1 is not NULL and col1 != '' limit 100000")
    val vertexRDD: RDD[(VertexId, String)] = res_two.rdd.map(row => (row.getString(0).hashCode, row(0).asInstanceOf[String]))

    val graph = Graph(vertexRDD, edgesRDD)
    graph.cache()

    val connectedComponents = graph.connectedComponents().vertices

我使用以下方式在 EMR 集群上提交 jar:

spark-submit --conf spark.hadoop.fs.s3a.access.key=xxx --conf spark.hadoop.fs.s3a.secret.key=xxx --conf spark.yarn.submit.waitAppCompletion=false --class com.mypkg.SampleGraphX --master yarn --deploy-mode cluster --num-executors 12 --executor-cores 6 --executor-memory 10g --conf "spark.driver.extraJavaOptions=-Djavax.net.ssl.sessionCacheSize=1000 -Djavax.net.ssl.sessionCacheTimeout=60" --conf "spark.executor.extraJavaOptions=Djavax.net.ssl.sessionCacheSize=1000 -Djavax.net.ssl.sessionCacheTimeout=60" GraphxTest1.jar

table1 和 table2 都有数百万个条目,但我将代码限制为只能从中读取 10000 和 100000 个条目。两者都是位于 S3 上的外部表。这项工作已经运行了将近 2 天。为什么要花这么长时间?我的代码有问题吗?还是我需要更改一些配置?

另外,当我查看 UI 时,我发现即使我的集群有 64 个 vCore(每个节点 8 个核心),每个节点也只使用 1 个核心。 在下图中,总共 64 个内核中只有 3 个被使用。我已经看到最多有 6 个内核(当 6 个节点处于活动状态时)。为什么不使用所有核心?

我是 Spark 和 GraphX 的新手,所以我不确定自己做错了什么。

【问题讨论】:

    标签: apache-spark emr spark-graphx


    【解决方案1】:

    您不是在看 Spark UI,这是调度程序 UI。您应该在列表中找到您的应用程序,并访问链接到您的应用程序的 URL。 当您访问 Spark UI 时,更容易判断发生了什么(存储中内存不足、分区不足、卡在读取数据中、卡在迭代中……)

    【讨论】:

    • 日志是这样写的:17/08/22 23:17:48 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container marked as failed: container_1502391082530_0027_01_000007 on host: .... Exit status: 1. Diagnostics: Exception from container-launch. Container id: container_1502391082530_0027_01_000007 Exit code: 1 ... Container exited with a non-zero exit code 1。是不是配置问题?我是否使用了太多内存?
    猜你喜欢
    • 1970-01-01
    • 2014-01-15
    • 1970-01-01
    • 2018-02-07
    • 2016-07-30
    • 2013-07-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多