【发布时间】:2016-08-24 16:22:36
【问题描述】:
我遇到了一个问题,即 YARN 因超出内存限制而杀死我的容器:
Container killed by YARN for exceeding memory limits. physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
我有 20 个 m3.2xlarge 的节点,所以它们有:
cores: 8
memory: 30
storage: 200 gb ebs
我的应用程序的要点是我有几个 100k 资产,我为这些资产生成了去年每小时的历史数据,未压缩的数据集总大小为 2TB。我需要使用这些历史数据来生成每个资产的预测。我的设置是我首先使用 s3distcp 将存储为索引 lzo 文件的数据移动到 hdfs。然后我将数据拉入并将其传递给 sparkSql 以处理 json:
val files = sc.newAPIHadoopFile("hdfs:///local/*",
classOf[com.hadoop.mapreduce.LzoTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],
classOf[org.apache.hadoop.io.Text],conf)
val lzoRDD = files.map(_._2.toString)
val data = sqlContext.read.json(lzoRDD)
然后我使用 groupBy 按资产对历史数据进行分组,创建一个 (assetId,timestamp,sparkSqlRow) 的元组。我认为这种数据结构在生成每个资产的预测时可以更好地进行内存操作。
val p = data.map(asset => (asset.getAs[String]("assetId"),asset.getAs[Long]("timestamp"),asset)).groupBy(_._1)
然后我使用 foreach 遍历每一行,计算预测,最后将预测作为 json 文件写回 s3。
p.foreach{ asset =>
(1 to dateTimeRange.toStandardHours.getHours).foreach { hour =>
// determine the hour from the previous year
val hourFromPreviousYear = (currentHour + hour.hour) - timeRange
// convert to seconds
val timeToCompare = hourFromPreviousYear.getMillis
val al = asset._2.toList
println(s"Working on asset ${asset._1} for hour $hour with time-to-compare: $timeToCompare")
// calculate the year over year average for the asset
val yoy = calculateYOYforAsset2(al, currentHour, asset._1)
// get the historical data for the asset from the previous year
val pa = asset._2.filter(_._2 == timeToCompare)
.map(row => calculateForecast(yoy, row._3, asset._1, (currentHour + hour.hour).getMillis))
.foreach(json => writeToS3(json, asset._1, (currentHour + hour.hour).getMillis))
}
}
- 有没有更好的方法来实现这一点,这样我就不会遇到 YARN 的内存问题?
- 有没有办法对资产进行分块,以便 foreach 一次只在大约 10k 上运行,而对所有 200k 资产进行操作?
任何建议/帮助表示赞赏!
【问题讨论】:
-
您可能会遇到与我在此答案中给出的示例类似的问题:stackoverflow.com/a/36475604/3415409
标签: scala apache-spark apache-spark-sql emr amazon-emr