【发布时间】:2018-01-20 03:27:15
【问题描述】:
我正在用 scala 编写一个关于 Spark 的程序。它用于计算键的数量。这是数据示例:
名称 水果店 一个苹果中国 一个苹果中国 一个苹果美国 一根香蕉英国 B苹果日本 B橙智利 C苹果法语
这是一个多列的数据框,但我只关心以上三列,所以可能会有一些重复记录。我想统计一下,比如A吃的水果的产地数量。
val res = data.select("name","fruit","place")
.map(v=>((v.getString(0),v.getString(1)),ArrayBuffer(v.getString(2)))).rdd.reduceByKey((a,b)=>a++=b)
.map(v=>(v._1._1,Map(v._1._2 -> v._2.toSet.size))).reduceByKey((a,b)=>a++=b)
我首先选择我需要的列,然后以("name", "fruit")为key,将每个人吃的每种水果的产地收集到一个ArrayBuffer中。然后我使用“名称”作为关键字,在 {“apple”:2} 这样的地图中收集每个水果的产地数量。所以结果非正式地类似于 RDD[("name",Map("fruit"->"places count"))]。
在程序中,我做了大约 3 次 来计算类似于上述示例的信息。例如,计算一个产地每个人吃的不同水果的数量。
数据大小约为 80GB,我在 50 个执行器上运行该作业。每个执行器有 4 个内核和 24GB 的内存。此外,数据被重新划分为 200 个分区。所以这项工作应该如我所料在很短的时间内完成。但是,我花了一天多的时间来运行该作业并失败,因为 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 10 和 java.lang.OutOfMemoryError:超出 GC 开销限制。
我做了很多事情来优化这个程序,比如重置 spark.mesos.executor.memoryOverhead 并使用可变映射来最小化频繁创建和清理对象的 GC 成本。我什至尝试使用 reduceByKey 将具有相同键的数据移动到一个分区中以提高性能,但几乎没有帮助。代码如下:
val new_data = data.map(v=>(v.getAs[String]("name"),ArrayBuffer((v.getAs[String]("fruit"),v.getAs[String]("place")))))
.rdd.reduceByKey((a,b)=>a++=b).cache()
然后我不需要在每次进行类似计算时都对数据进行洗牌。而后面的工作可以在new_data的基础上进行。但是,这种优化似乎不起作用。
最后,我发现大约有 50% 的数据在“name”字段上具有相同的值,比如“H”。我删除了名为“H”的数据,工作在 1 小时内完成。
这是我的问题:
为什么key的分布对reduceByKey的性能影响如此之大?我用“分布”这个词来表示不同键的出现次数。在我的情况下,数据的大小并不大,但是一个键控制了数据,因此性能受到很大影响。我认为这是 reduceByKey 的问题,我错了吗?
如果我必须保留名称为“H”的记录,如何避免性能问题?
是否可以使用reduceByKey对数据进行重新分区并将具有相同键(“名称”)的记录放入一个分区中?
将具有相同键(“名称”)的记录移动到一个分区以提高性能真的有帮助吗?我知道这可能会导致内存问题,但我必须在程序中多次运行类似的代码,所以我想它可能对以后的工作有所帮助。我说的对吗?
感谢您的帮助!
【问题讨论】:
-
我最好的猜测是,当你通过 key 减少时,所有名为“H”的记录都被发送到一个节点(40GB 的数据!),然后一个节点尝试完成所有工作(大量数据溢出到磁盘上)
-
@bendl 感谢您的评论。是的,我想是的,但即使有 40GB 的数据,执行程序的内存也是 24GB,这意味着它不应该那么慢(超过 1 天)。你有什么想法吗?
标签: scala apache-spark spark-dataframe