【发布时间】:2020-07-29 18:31:26
【问题描述】:
我有一个数据框并使用该数据框运行多次迭代。
val raw = getDataframe() // <-- the is Stage 3 operation.
val df = raw.repartition(2000, col("id")) // <-- start stage 4
.dropDuplicates(Seq("id"))
.persist(StorageLevel.MEMORY_AND_DISK_SER)
// do iterative jobs..
// job2
// job3
// job4
// ...
工作 2
工作 3
问题
- Spark 仅按阶段跳过?但不适用于操作级别的任务?
- 第 4 阶段的SortAggregates 可能是
dropDuplicate操作。如何只删除一次重复项? -
raw.count()会在dropDuplicate()之后将一个阶段一分为二,dropDuplicate会从第二次迭代中跳过吗?
更新 1
我在persist后添加了count()。
val df = raw.repartition(2000, col("id")) // <-- start stage 4
.dropDuplicates(Seq("id"))
.persist(StorageLevel.MEMORY_AND_DISK_SER)
df.count()
df.explain(true) <-- plan A
// do iterative jobs..
// job2
// iterativejob.explain(true) <-- plan B
迭代作业示例
迭代作业是 cubing sql。类似的东西。
df.createOrReplaceTempView("tb")
dataframes = dataframes :+ sqlContext.sql(
s"""
| SELECT
| d_scene_id, d_action_id, d_classifier,
| count(*) m_event,
| approx_count_distinct(user_key, ${rsd}) m_user,
| approx_count_distinct(device_id, ${rsd}) m_device,
| 3 dimension_count
| FROM tb
| GROUP BY d_scene_id, d_action_id, d_classifier
|""".stripMargin)
再次比较计划
然后在这里比较了两个物理计划。 https://github.com/jeesim2/test/pull/1/files?diff=split&w=1
左边是A的计划,右边是B的计划。
正如您所见,之前和包括 InMemoryRelation(cache) 的树木在两侧完全相同。
+- InMemoryRelation [event_time#13, scene_id#14, action_id#15, classifier#16, event_hash#0, user_key#3, device_id#11, product#4, country#77, os_name#157, app_ver#117, p0name#197, p0value#597, p1name#237, p1value#637, p2name#277, p2value#677, p3name#317, p3value#717, p4name#357, p4value#757, p5name#397, p5value#797, p6name#437, ... 7 more fields], true, 10000, StorageLevel(disk, memory, 1 replicas)
那么,为什么SortAggregate(key=[event_hash#0])(dropDuplicates('event_hash')) 每次迭代都重新计算?
【问题讨论】:
-
您是否尝试过只运行 df.count 而没有迭代作业?这样您就可以比较逻辑计划的哪些部分发生在持久操作之后。持久化/缓存将实现 df 直到您调用持久化操作并截断本地计划,这将限制催化剂优化的范围。您能否也分享一些您的迭代作业的代码,以便更好地推理图表?
-
@milos 我更新了一些。请再看一遍好吗?
标签: apache-spark apache-spark-sql