【问题标题】:Cross Join in Apache Spark with dataset is very slow使用数据集的 Apache Spark 中的交叉连接非常慢
【发布时间】:2019-07-09 20:36:49
【问题描述】:

我已经在 spark 用户论坛上发布了这个问题,但没有收到任何回复,所以在这里再次询问。

我们有一个用例,我们需要进行笛卡尔连接,但由于某种原因,我们无法使其与 Dataset API 一起使用。

我们有两个数据集:

  • 一个包含 2 个字符串列的数据集,例如 c1、c2。这是一个包含约 100 万条记录的小型数据集。这两列都是 32 个字符的字符串,因此应该小于 500 mb。

    我们广播这个数据集

  • 另一个数据集稍大,有大约 1000 万条记录
val ds1 = spark.read.format("csv").option("header", "true").load(<s3-location>).select("c1", "c2")
ds1.count
val ds2 = spark.read.format("csv").load(<s3-location>).toDF("c11", "c12", "c13", "c14", "c15", "ts")
ds2.count
ds2.crossJoin(broadcast(ds1)).filter($"c1" <= $"c11" && $"c11" <= $"c2").count

如果我使用 RDD api 实现它,我在 ds1 中广播数据,然后在 ds2 中过滤数据,它可以正常工作。

我已经确认广播成功了。

2019-02-14 23:11:55 INFO CodeGenerator:54 - 在 10.469136 毫秒内生成的代码 2019-02-14 23:11:55 INFO TorrentBroadcast:54 - 开始读取广播变量 29 2019-02-14 23:11:55 INFO TorrentBroadcast:54 - 读取广播变量 29 耗时 6 毫秒 2019-02-14 23:11:56 INFO CodeGenerator:54 - 在 11.280087 毫秒内生成的代码

查询计划:

== 物理计划 ==
BroadcastNestedLoopJoin BuildRight, Cross, ((c1#68 :- *项目 []
: +- *Filter isnotnull(_c0#0)
: +- *FileScan csv [_c0#0,_c1#1,_c2#2,_c3#3,_c4#4,_c5#5] 批处理:false,格式:CSV,位置:InMemoryFileIndex[],PartitionFilters:[], PushedFilters: [IsNotNull(_c0)], ReadSchema: struct
+- BroadcastExchange IdentityBroadcastMode
+- *项目 [c1#68, c2#69]
+- *Filter (isnotnull(c1#68) && isnotnull(c2#69))
+- *FileScan csv [c1#68,c2#69] 批处理:false,格式:CSV,位置:InMemoryFileIndex[],PartitionFilters:[],PushedFilters:[IsNotNull(c1),IsNotNull(c2)],ReadSchema:结构

那么这个阶段就没有进展了。

我更新了代码以使用广播 ds1,然后在 ds2 的 mapPartitions 中加入。

val ranges = spark.read.format("csv").option("header", "true").load(<s3-location>).select("c1", "c2").collect
val rangesBC = sc.broadcast(ranges)

然后在 mapPartitions 方法中使用这个 rangeBC 来识别 ds2 中每一行所属的范围,这个作业在 3 小时内完成,而另一个作业即使在 24 小时后也没有完成。这种暗示意味着查询优化器没有做我想做的事。

我做错了什么?任何指针都会有所帮助。谢谢!

【问题讨论】:

  • 添加说明供我们查看。让我印象深刻的是 10M x 1M 可能需要一段时间
  • 但是因为 ds1 是广播它不应该花费这么多时间。它还可以在不到 10 分钟的时间内使用基于 RDD 的 API。我也更新了查询计划。
  • 是的。我现在数一数。
  • 我看到太晚了。
  • 试过了,但速度极慢。

标签: apache-spark join apache-spark-dataset cross-join


【解决方案1】:

我不知道您是在裸机还是 AWS 上使用现货、按需或专用,还是使用 AZURE 等虚拟机。我的看法:

  • 感谢 10M x 1M 的工作量很大,即使 .filter 应用于生成的交叉连接。这需要一些时间。您的期望是什么?
  • Spark 就是以线性方式进行缩放。
  • 具有虚拟机的数据中心没有专用的,因此没有最快的性能。

然后:

  • 我在 Databricks 10M x 100K 上以 0.86 内核和 6GB Driver for Community Edition 的模拟设置运行。运行时间为 17 分钟。
  • 我在一个 4 节点 AWS EMR 非专用集群上运行了您示例中的 10M x 1M(有一些 EMR 奇怪的东西,例如在有价值的实例上保留驱动程序!)部分完成需要 3 个小时。见下图。

所以,回答你的问题: - 你没有做错任何事。

  • 只需要更多的资源来实现更多的并行化。
  • 如您所见,我确实添加了一些显式分区。

【讨论】:

  • 我已经更新了我的帖子,如果我收集和广播 ds1,我不认为这是资源问题。
  • @Ankur 看到 100k 它完成得很快。
  • @Ankur 确实很奇怪。 2.4 Spark,你呢?
  • 我使用的是 Spark 2.1.1。将尝试使用 Spark 2.4。
  • @Ankur 我今天早上发帖后重新跑了。我注意到在 EMR 集群上它也停止了。它比显示的要远一些,所以有些可疑。它只是第一次测试和愚蠢逻辑的 10 倍。如果有 15 个执行者,它应该会更快完成。
【解决方案2】:

我最近遇到了这个问题,发现 Spark 在交叉连接大型数据帧时会出现奇怪的分区行为。如果您的输入数据框包含几百万条记录,则交叉连接的数据框的分区等于输入数据框分区的乘积,即

crossJoinDF 的分区 = (ds1 的分区) * (ds2 的分区)。

如果 ds1 或 ds2 包含大约几百个分区,则交叉连接数据帧的分区范围约为 10,000。这些分区太多了,这会导致管理许多小任务的开销过大,从而使交叉连接数据帧上的任何计算(在您的情况下 - 过滤器)的运行速度非常慢。

那么如何让计算更快呢?首先检查这是否确实是您的问题的问题:

scala> val crossJoinDF = ds2.crossJoin(ds1)
# This should return immediately because of spark lazy evaluation

scala> val crossJoinDFPartitions = crossJoinDF.rdd.partitions.size

检查交叉连接数据帧上的分区数。如果 crossJoinDFPartitions > 10,000,那么你确实有同样的问题,即交叉连接的数据框有太多的分区。

为了使您对交叉连接数据帧的操作更快,请减少输入数据帧上的分区数。例如:

scala> val ds1 = ds1.repartition(40)
scala> ds1.rdd.partitions.size 
res80: Int = 40

scala> val ds2 = ds2.repartition(40)
scala> ds2.rdd.partitions.size 
res81: Int = 40

scala> val crossJoinDF = ds1.crossJoin(ds2)
scala> crossJoinDF.rdd.partitions.size 
res82: Int = 1600

scala> crossJoinDF.count()

count() 操作应导致执行交叉连接。现在应该在合理的时间内返回计数。您选择的确切分区数取决于集群中可用的核心数。

这里的关键是确保您的交叉连接数据框具有合理数量的分区(this post 有用,它更详细地解释了这个问题。

【讨论】:

  • 欢迎您,感谢您的贡献。这是一个非常有用的答案。为确保您不会与 Stack Overflow’s self-promotion policy 发生冲突,请务必确认您是所链接博客文章的作者。
  • 我承认我是我在回答中链接的博客文章的作者。谢谢!
猜你喜欢
  • 1970-01-01
  • 2019-06-06
  • 1970-01-01
  • 1970-01-01
  • 2021-11-11
  • 2015-02-08
  • 1970-01-01
  • 1970-01-01
  • 2016-07-07
相关资源
最近更新 更多