【发布时间】:2016-09-02 15:41:59
【问题描述】:
这是伪代码:
case class IpDB(startIp: Long, endIp: Long, company: String)
def ipMap(line: Array[String]):
val Array(startIp, endIp, company) = line
IpDB(startIp.toLong, endIp.toLong, company)
// The ip-db is just 300M in raw format.
// format of ip-data from s3:
// 100000 200000 GOOGLE
val ipData = sc.broadcast(sc.textFile("s3://....").map(_.split("\t", -1)).map(ipMap).collect().sortWith(_.startIp < _.startIp)).value
val dataA = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_)
val dataB = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_)
// will do ip company lookup here
dataA.fullOuterJoin(dataB).map(doIpCompanyLookUp(ipData, _)).collect()
代码只是从输入源获取ip数据,加入后找到它的公司。 这是我的问题:
这段代码将在生产中运行 2-3 分钟,但是当删除广播数据时(只需加入两个数据),它只花费不到 1 分钟。当我查看 spark 的 ui 时,我发现 gc时间可能是问题所在。
这是运行此作业的设置:
spark-submit --master yarn --deploy-mode client --driver-memory 4g --num-executors 10 --executor-memory 8800m --executor-cores 4 --class ... XX.jar
此作业正在 aws emr spark 集群上运行
spark version: 1.6.1
10 m3.xlarge.
- 如何解决这个问题(减少运行时间)?
- Spark 中广播数据消耗哪些内存?
- 为什么当我更改执行器内存时运行时间没有改变?我尝试使用 5*m3.2xlarge 和 --executor-memory 16g,广播数据的总运行时间没有显着变化。
更新:
case class IpDB(startIp: Long, endIp: Long, company: String)
def ipMap(line: Array[String]):
val Array(startIp, endIp, company) = line
IpDB(startIp.toLong, endIp.toLong, company)
// The ip-db is just 300M in raw format.
// format of ip-data from s3:
// 100000 200000 GOOGLE
val dataA = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_)
val dataB = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_)
// will do ip company lookup here
val joinResult = dataA.fullOuterJoin(dataB)
val ipData = sc.broadcast(sc.textFile("s3://....").map(_.split("\t", -1)).map(ipMap).collect().sortWith(_.startIp < _.startIp)).value
joinResult.map(doIpCompanyLookUp(ipData, _)).collect()
只是将ip-company数据的生成和广播移到dataA.fullOuterJoin(dataB)之后。运行时间大大减少。
更新2。 由于生产代码比较复杂,与上面的伪代码不同,在代码顺序稍加改动后,程序运行速度更快,但我不确定问题的关键是初始化广播数据的位置。
【问题讨论】:
-
也许这有点幼稚,但是广播开始在网络周围发送 300M 并阻塞执行器。你能试着找到 DAG 吗?
标签: scala apache-spark