【发布时间】:2019-01-18 21:25:05
【问题描述】:
我有一个在 Mesos 集群中运行的 Spark 2.1.1 作业。 Spark UI 显示 32 个活动执行器,RDD.getNumPartitions 显示 28 个分区。 但是只有一个(随机)执行者在做任何工作,所有其他人都被标记为已完成。 我在执行程序代码(stdout)中添加了调试语句,只有一个执行程序显示了这些。整个管道的结构如下: 获取 id 列表 -> 为每个 id 下载 JSON 数据 -> 解析 JSON 数据 -> 保存到 S3。
stage 1: val ids=session.sparkContext.textFile(path).repartition(28) -> RDD[String]
//ids.getNumPartitions shows 28
stage 2: val json=ids.mapPartitions { keys =>
val urlBuilder ...
val buffer ....
keys map { key =>
val url=urlBuilder.createUrl(id) //java.net.URL
val json=url.openStream() ... //download text to buffer, close stream
(id,json.toString)
}
} -> RDD[Tuple2[String,String]]
stage 3: val output = json flatMap { t =>
val values = ... //parse JSON, get values from JSON or empty sequence if not found
values map { value => (t._1, value) }
} -> RDD[Tuple2[String,String]]
stage 4: output.saveAsTextFile("s3://...")
这些是 Spark 二进制文件的配置设置: --driver-memory 32g --conf spark.driver.cores=4 --executor-memory 4g --conf spark.cores.max=128 --conf spark.executor.cores=4
仅在一个执行器上运行的阶段是第二个。 我在第一步中明确指定了分区数(repartition(28))。 有没有人见过这样的行为? 谢谢,
M
解决方案
我采取了另一种方式(请参阅 Travis 的建议)并将分区数量(在第 1 步之后)增加到 100。这很奏效,工作在几分钟内完成。但有一个副作用 - 现在我在 S3 中有 100 个部分文件。
【问题讨论】:
-
你能提供一个代码和你用来开始你的工作的命令吗?
标签: apache-spark partition executor