【发布时间】:2019-07-09 20:36:49
【问题描述】:
我已经在 spark 用户论坛上发布了这个问题,但没有收到任何回复,所以在这里再次询问。
我们有一个用例,我们需要进行笛卡尔连接,但由于某种原因,我们无法使其与 Dataset API 一起使用。
我们有两个数据集:
- 一个包含 2 个字符串列的数据集,例如 c1、c2。这是一个包含约 100 万条记录的小型数据集。这两列都是 32 个字符的字符串,因此应该小于 500 mb。
我们广播这个数据集
- 另一个数据集稍大,有大约 1000 万条记录
val ds1 = spark.read.format("csv").option("header", "true").load(<s3-location>).select("c1", "c2")
ds1.count
val ds2 = spark.read.format("csv").load(<s3-location>).toDF("c11", "c12", "c13", "c14", "c15", "ts")
ds2.count
ds2.crossJoin(broadcast(ds1)).filter($"c1" <= $"c11" && $"c11" <= $"c2").count
如果我使用 RDD api 实现它,我在 ds1 中广播数据,然后在 ds2 中过滤数据,它可以正常工作。
我已经确认广播成功了。
2019-02-14 23:11:55 INFO CodeGenerator:54 - 在 10.469136 毫秒内生成的代码 2019-02-14 23:11:55 INFO TorrentBroadcast:54 - 开始读取广播变量 29 2019-02-14 23:11:55 INFO TorrentBroadcast:54 - 读取广播变量 29 耗时 6 毫秒 2019-02-14 23:11:56 INFO CodeGenerator:54 - 在 11.280087 毫秒内生成的代码
查询计划:
== 物理计划 ==
BroadcastNestedLoopJoin BuildRight, Cross, ((c1#68 :- *项目 []
: +- *Filter isnotnull(_c0#0)
: +- *FileScan csv [_c0#0,_c1#1,_c2#2,_c3#3,_c4#4,_c5#5] 批处理:false,格式:CSV,位置:InMemoryFileIndex[],PartitionFilters:[], PushedFilters: [IsNotNull(_c0)], ReadSchema: struct
+- BroadcastExchange IdentityBroadcastMode
+- *项目 [c1#68, c2#69]
+- *Filter (isnotnull(c1#68) && isnotnull(c2#69))
+- *FileScan csv [c1#68,c2#69] 批处理:false,格式:CSV,位置:InMemoryFileIndex[],PartitionFilters:[],PushedFilters:[IsNotNull(c1),IsNotNull(c2)],ReadSchema:结构
那么这个阶段就没有进展了。
我更新了代码以使用广播 ds1,然后在 ds2 的 mapPartitions 中加入。
val ranges = spark.read.format("csv").option("header", "true").load(<s3-location>).select("c1", "c2").collect
val rangesBC = sc.broadcast(ranges)
然后在 mapPartitions 方法中使用这个 rangeBC 来识别 ds2 中每一行所属的范围,这个作业在 3 小时内完成,而另一个作业即使在 24 小时后也没有完成。这种暗示意味着查询优化器没有做我想做的事。
我做错了什么?任何指针都会有所帮助。谢谢!
【问题讨论】:
-
添加说明供我们查看。让我印象深刻的是 10M x 1M 可能需要一段时间
-
但是因为 ds1 是广播它不应该花费这么多时间。它还可以在不到 10 分钟的时间内使用基于 RDD 的 API。我也更新了查询计划。
-
是的。我现在数一数。
-
我看到太晚了。
-
试过了,但速度极慢。
标签: apache-spark join apache-spark-dataset cross-join