【问题标题】:Spark rdd cached but not reusedSpark rdd 缓存但未重用
【发布时间】:2020-07-29 18:31:26
【问题描述】:

我有一个数据框并使用该数据框运行多次迭代。

val raw = getDataframe() // <-- the is Stage 3 operation.

val df = raw.repartition(2000, col("id")) // <-- start stage 4
            .dropDuplicates(Seq("id"))
            .persist(StorageLevel.MEMORY_AND_DISK_SER)

// do iterative jobs..
// job2
// job3
// job4
// ...

工作 2

  • 在第 4 阶段有一个缓存点 (mapPartitionsInternal)
  • 由于这是第一次迭代,所有阶段和操作都需要进行操作。

工作 3

  • 第 3 阶段已跳过
  • 但是,第 4 阶段的前 5 个蓝色框不会被跳过,即使那些是从 Job2 中缓存的

问题

  • Spark 仅按阶段跳过?但不适用于操作级别的任务?
  • 第 4 阶段的SortAggregates 可能是dropDuplicate 操作。如何只删除一次重复项?
  • raw.count() 会在dropDuplicate() 之后将一个阶段一分为二,dropDuplicate 会从第二次迭代中跳过吗?

更新 1

我在persist后添加了count()。

val df = raw.repartition(2000, col("id")) // <-- start stage 4
            .dropDuplicates(Seq("id"))
            .persist(StorageLevel.MEMORY_AND_DISK_SER)

df.count()
df.explain(true) <-- plan A

// do iterative jobs..
// job2
// iterativejob.explain(true) <-- plan B

迭代作业示例

迭代作业是 cubing sql。类似的东西。

    df.createOrReplaceTempView("tb")

    dataframes = dataframes :+ sqlContext.sql(
      s"""
         | SELECT
         |   d_scene_id, d_action_id, d_classifier,
         |   count(*) m_event,
         |   approx_count_distinct(user_key, ${rsd}) m_user,
         |   approx_count_distinct(device_id, ${rsd}) m_device,
         |   3 dimension_count
         | FROM tb
         | GROUP BY d_scene_id, d_action_id, d_classifier
         |""".stripMargin)

再次比较计划

然后在这里比较了两个物理计划。 https://github.com/jeesim2/test/pull/1/files?diff=split&w=1

左边是A的计划,右边是B的计划。

正如您所见,之前和包括 InMemoryRelation(cache) 的树木在两侧完全相同。

+- InMemoryRelation [event_time#13, scene_id#14, action_id#15, classifier#16, event_hash#0, user_key#3, device_id#11, product#4, country#77, os_name#157, app_ver#117, p0name#197, p0value#597, p1name#237, p1value#637, p2name#277, p2value#677, p3name#317, p3value#717, p4name#357, p4value#757, p5name#397, p5value#797, p6name#437, ... 7 more fields], true, 10000, StorageLevel(disk, memory, 1 replicas)

那么,为什么SortAggregate(key=[event_hash#0])(dropDuplicates('event_hash')) 每次迭代都重新计算?

【问题讨论】:

  • 您是否尝试过只运行 df.count 而没有迭代作业?这样您就可以比较逻辑计划的哪些部分发生在持久操作之后。持久化/缓存将实现 df 直到您调用持久化操作并截断本地计划,这将限制催化剂优化的范围。您能否也分享一些您的迭代作业的代码,以便更好地推理图表?
  • @milos 我更新了一些。请再看一遍好吗?

标签: apache-spark apache-spark-sql


【解决方案1】:

在我看来,一切正常。 由于 approx_count_distinct 在不同的列上,因此计划中有两个 sortAgregates 阶段。

您的缓存发生在 WholeStageCodegen 之后,该 WholeStageCodegen 对应于列上的重新分区并在同一列上删除重复项。交换对应将这些分区交换给对应的执行者。

此时,这些新分区被缓存在执行程序中,当迭代作业 2 被触发时,这些阶段将被跳过。

所以要在圈出的阶段回答“这是重新计算过的吗”,答案是肯定的,但基于所提供的应该发生的信息。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-07-08
    • 2021-03-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-01-01
    • 1970-01-01
    相关资源
    最近更新 更多