【问题标题】:Why does using broadcast increase time?为什么使用广播会增加时间?
【发布时间】:2016-09-02 15:41:59
【问题描述】:

这是伪代码:

case class IpDB(startIp: Long, endIp: Long, company: String)
def ipMap(line: Array[String]):
    val Array(startIp, endIp, company) = line
    IpDB(startIp.toLong, endIp.toLong, company)
// The ip-db is just 300M in raw format.
// format of ip-data from s3:
// 100000 200000 GOOGLE
val ipData = sc.broadcast(sc.textFile("s3://....").map(_.split("\t", -1)).map(ipMap).collect().sortWith(_.startIp < _.startIp)).value

val dataA = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_)
val dataB = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_)

// will do ip company lookup here
dataA.fullOuterJoin(dataB).map(doIpCompanyLookUp(ipData, _)).collect()

代码只是从输入源获取ip数据,加入后找到它的公司。 这是我的问题:

这段代码将在生产中运行 2-3 分钟,但是当删除广播数据时(只需加入两个数据),它只花费不到 1 分钟。当我查看 spark 的 ui 时,我发现 gc时间可能是问题所在。

这是运行此作业的设置:

spark-submit --master yarn --deploy-mode client --driver-memory 4g --num-executors 10 --executor-memory 8800m --executor-cores 4 --class ... XX.jar 

此作业正在 aws emr spark 集群上运行

spark version: 1.6.1
10 m3.xlarge.
  1. 如何解决这个问题(减少运行时间)?
  2. Spark 中广播数据消耗哪些内存?
  3. 为什么当我更改执行器内存时运行时间没有改变?我尝试使用 5*m3.2xlarge 和 --executor-memory 16g,广播数据的总运行时间没有显着变化。

更新:

case class IpDB(startIp: Long, endIp: Long, company: String)
def ipMap(line: Array[String]):
    val Array(startIp, endIp, company) = line
    IpDB(startIp.toLong, endIp.toLong, company)
// The ip-db is just 300M in raw format.
// format of ip-data from s3:
// 100000 200000 GOOGLE
val dataA = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_)
val dataB = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_)

// will do ip company lookup here
val joinResult = dataA.fullOuterJoin(dataB)
val ipData = sc.broadcast(sc.textFile("s3://....").map(_.split("\t", -1)).map(ipMap).collect().sortWith(_.startIp < _.startIp)).value
joinResult.map(doIpCompanyLookUp(ipData, _)).collect()

只是将ip-company数据的生成和广播移到dataA.fullOuterJoin(dataB)之后。运行时间大大减少。

更新2。 由于生产代码比较复杂,与上面的伪代码不同,在代码顺序稍加改动后,程序运行速度更快,但我不确定问题的关键是初始化广播数据的位置。

【问题讨论】:

  • 也许这有点幼稚,但是广播开始在网络周围发送 300M 并阻塞执行器。你能试着找到 DAG 吗?

标签: scala apache-spark


【解决方案1】:

不用多想你的代码:

val ipData = sc.broadcast(sc.textFile("s3://....").map(_.split("\t", -1)).map(ipMap).collect().sortWith(_.startIp < _.startIp)).value

让我很担心。

您正在使用sc.textFile 构建一个分布式RDD,只是为了通过collect() 将其本地化(对驱动程序),然后通过sc.broadcast 将其再次分发并提供给执行程序(!)如您所见有很多来回发送数据。

你最好cacheing IP 数据以便它保留在内存中,但是这样做:

sc.textFile("s3://....").map(_.split("\t", -1)).map(ipMap).cache

【讨论】:

  • 当您将整个 ip-company 数据的 broadcast 替换为 cache 时会发生什么?这对时间有何影响?
  • 由于我需要做 ip-company 查询,RDD[IpDB] 的数据类型就帮不上忙了。这就是为什么我需要广播所有数据以确保所有执行者都有 ip-company 数据的副本。只要确保我得到broadcastcache 之间的区别。所以我无法在程序中使用cache 使其工作。
猜你喜欢
  • 1970-01-01
  • 2023-03-22
  • 1970-01-01
  • 1970-01-01
  • 2015-09-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-03-27
相关资源
最近更新 更多