【发布时间】:2017-08-17 16:31:16
【问题描述】:
我们正在运行下一个阶段的 DAG,并且对于相对较小的 shuffle 数据大小(每个任务大约 19MB)经历了较长的 shuffle 读取时间
一个有趣的方面是每个执行器/服务器中的等待任务具有相同的随机读取时间。以下是其含义的示例:对于以下服务器,一组任务等待大约 7.7 分钟,另一组等待大约 26 秒。
这是同一阶段运行的另一个示例。该图显示了 3 个执行器/服务器,每个执行器/服务器具有相同的任务组,具有相同的随机读取时间。蓝色组代表由于推测执行而终止的任务:
并不是所有的执行者都这样。有些任务几乎可以在几秒钟内完成所有任务,并且这些任务的远程读取数据大小与在其他服务器上等待很长时间的任务相同。 此外,这种类型的阶段在我们的应用程序运行时中运行了 2 次。产生这些具有大量随机读取时间的任务组的服务器/执行器在每个阶段运行中都是不同的。
以下是其中一个服务器/主机的任务统计表示例:
看起来负责这个 DAG 的代码如下:
output.write.parquet("output.parquet")
comparison.write.parquet("comparison.parquet")
output.union(comparison).write.parquet("output_comparison.parquet")
val comparison = data.union(output).except(data.intersect(output)).cache()
comparison.filter(_.abc != "M").count()
我们非常感谢您对此的看法。
【问题讨论】:
-
奇怪。代码和数据样本将不胜感激。我看到 DAG 的每一步都有一个缓存调用,你在缓存所有内容吗?
-
你好。谢谢你的问题。我在上面的描述中发布了代码。我们仅在认为需要时才进行缓存。
-
except 和 intersect 调用在我的关注范围内。你的 DAG 引用了一个 sortmergejoin;您是否已经知道是哪条线路造成了问题?
-
我们认为 sortmergejoin 来自上述代码中的 except 或 intersect。另一条信息是我们正在使用 MesosExternalShuffleService
标签: scala apache-spark shuffle