【问题标题】:Spark reduceBykey works poorlySpark reduceBykey 效果不佳
【发布时间】:2018-01-20 03:27:15
【问题描述】:

我正在用 scala 编写一个关于 Spark 的程序。它用于计算键的数量。这是数据示例:

 名称 水果店
 一个苹果中国
 一个苹果中国
 一个苹果美国
 一根香蕉英国
 B苹果日本
 B橙智利
 C苹果法语

这是一个多列的数据框,但我只关心以上三列,所以可能会有一些重复记录。我想统计一下,比如A吃的水果的产地数量。

val res = data.select("name","fruit","place")
.map(v=>((v.getString(0),v.getString(1)),ArrayBuffer(v.getString(2)))).rdd.reduceByKey((a,b)=>a++=b)
.map(v=>(v._1._1,Map(v._1._2 -> v._2.toSet.size))).reduceByKey((a,b)=>a++=b)

我首先选择我需要的列,然后以("name", "fruit")为key,将每个人吃的每种水果的产地收集到一个ArrayBuffer中。然后我使用“名称”作为关键字,在 {“apple”:2} 这样的地图中收集每个水果的产地数量。所以结果非正式地类似于 RDD[("name",Map("fruit"->"places count"))]。

在程序中,我做了大约 3 次 来计算类似于上述示例的信息。例如,计算一个产地每个人吃的不同水果的数量。

数据大小约为 80GB,我在 50 个执行器上运行该作业。每个执行器有 4 个内核和 24GB 的内存。此外,数据被重新划分为 200 个分区。所以这项工作应该如我所料在很短的时间内完成。但是,我花了一天多的时间来运行该作业并失败,因为 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 10java.lang.OutOfMemoryError:超出 GC 开销限制

我做了很多事情来优化这个程序,比如重置 spark.mesos.executor.memoryOverhead 并使用可变映射来最小化频繁创建和清理对象的 GC 成本。我什至尝试使用 reduceByKey 将具有相同键的数据移动到一个分区中以提高性能,但几乎没有帮助。代码如下:

val new_data = data.map(v=>(v.getAs[String]("name"),ArrayBuffer((v.getAs[String]("fruit"),v.getAs[String]("place"))))) 
 .rdd.reduceByKey((a,b)=>a++=b).cache()

然后我不需要在每次进行类似计算时都对数据进行洗牌。而后面的工作可以在new_data的基础上进行。但是,这种优化似乎不起作用。

最后,我发现大约有 50% 的数据在“name”字段上具有相同的值,比如“H”。我删除了名为“H”的数据,工作在 1 小时内完成。

这是我的问题:

  1. 为什么key的分布对reduceByKey的性能影响如此之大?我用“分布”这个词来表示不同键的出现次数。在我的情况下,数据的大小并不大,但是一个键控制了数据,因此性能受到很大影响。我认为这是 reduceByKey 的问题,我错了吗?

  2. 如果我必须保留名称为“H”的记录,如何避免性能问题?

  3. 是否可以使用reduceByKey对数据进行重新分区并将具有相同键(“名称”)的记录放入一个分区中?

  4. 将具有相同键(“名称”)的记录移动到一个分区以提高性能真的有帮助吗?我知道这可能会导致内存问题,但我必须在程序中多次运行类似的代码,所以我想它可能对以后的工作有所帮助。我说的对吗?

感谢您的帮助!

【问题讨论】:

  • 我最好的猜测是,当你通过 key 减少时,所有名为“H”的记录都被发送到一个节点(40GB 的数据!),然后一个节点尝试完成所有工作(大量数据溢出到磁盘上)
  • @bendl 感谢您的评论。是的,我想是的,但即使有 40GB 的数据,执行程序的内存也是 24GB,这意味着它不应该那么慢(超过 1 天)。你有什么想法吗?

标签: scala apache-spark spark-dataframe


【解决方案1】:

为了避免大洗牌,你可以做的是首先做一个从水果到地方的数据框。

val fruitToPlaces = data.groupBy("fruit").agg(collect_set("place").as("places"))

这个数据框应该很小(即适合内存) 你做fruitToPlaces.cache.count以确保它没问题

然后你对水果进行连接。

data.join(fruitToPlaces, Seq("fruit"), "left_outer")

Spark 应该足够聪明,可以进行哈希连接(而不是随机连接)

【讨论】:

  • 感谢您的评论,但它似乎与我想要得到的有点不同。在你的情况下,吃过相同种类水果的人总是会得到相同的位置。然而,这不是真的,有人可能只是从美国而不是其他任何地方获得苹果。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-05-04
  • 1970-01-01
  • 2016-09-15
  • 2016-08-26
  • 2016-02-25
  • 2019-03-05
  • 2015-10-28
相关资源
最近更新 更多