【发布时间】:2017-08-24 16:09:08
【问题描述】:
我有一个 9 节点 m3.xlarge (8 cpu / 15 gig) EMR 集群,其中 1 个节点是主节点,其他 8 个节点是从节点。我正在尝试运行一个简单的程序来检查 GraphX 连接的组件。这是我的代码:
def main(args : Array[String]): Unit = {
val sparkConfig = new SparkConf()
.set("hive.exec.dynamic.partition", "true")
.set("hive.exec.dynamic.partition.mode", "nonstrict")
.set("hive.s3.max-client-retries", "50")
.set("hive.s3.max-error-retries", "50")
.set("hive.s3.max-connections", "100")
.set("hive.s3.connect-timeout", "5m")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrationRequired", "true")
.set("spark.kryo.classesToRegister", "org.apache.spark.graphx.impl.VertexAttributeBlock")
.set("spark.broadcast.compress", "true")
.set("spark.default.parallelism", "24")
val spark = SparkSession.builder()
.appName("Spark Hive Example")
.enableHiveSupport()
.config(sparkConfig)
.getOrCreate()
// Set Kryo for serializing
GraphXUtils.registerKryoClasses(sparkConfig)
val res = spark.sql("SELECT col1, col2, col3 FROM table1 limit 10000")
val edgesRDD = res.rdd.map(row => Edge(row.getString(0).hashCode, row.getString(1).hashCode, row(2).asInstanceOf[String]))
val res_two = spark.sql("SELECT col1 FROM table2 where col1 is not NULL and col1 != '' limit 100000")
val vertexRDD: RDD[(VertexId, String)] = res_two.rdd.map(row => (row.getString(0).hashCode, row(0).asInstanceOf[String]))
val graph = Graph(vertexRDD, edgesRDD)
graph.cache()
val connectedComponents = graph.connectedComponents().vertices
我使用以下方式在 EMR 集群上提交 jar:
spark-submit --conf spark.hadoop.fs.s3a.access.key=xxx --conf spark.hadoop.fs.s3a.secret.key=xxx --conf spark.yarn.submit.waitAppCompletion=false --class com.mypkg.SampleGraphX --master yarn --deploy-mode cluster --num-executors 12 --executor-cores 6 --executor-memory 10g --conf "spark.driver.extraJavaOptions=-Djavax.net.ssl.sessionCacheSize=1000 -Djavax.net.ssl.sessionCacheTimeout=60" --conf "spark.executor.extraJavaOptions=Djavax.net.ssl.sessionCacheSize=1000 -Djavax.net.ssl.sessionCacheTimeout=60" GraphxTest1.jar
table1 和 table2 都有数百万个条目,但我将代码限制为只能从中读取 10000 和 100000 个条目。两者都是位于 S3 上的外部表。这项工作已经运行了将近 2 天。为什么要花这么长时间?我的代码有问题吗?还是我需要更改一些配置?
另外,当我查看 UI 时,我发现即使我的集群有 64 个 vCore(每个节点 8 个核心),每个节点也只使用 1 个核心。 在下图中,总共 64 个内核中只有 3 个被使用。我已经看到最多有 6 个内核(当 6 个节点处于活动状态时)。为什么不使用所有核心?
【问题讨论】:
标签: apache-spark emr spark-graphx