【问题标题】:Why does Spark run 5 jobs for a simple aggregation?为什么 Spark 为一个简单的聚合运行 5 个作业?
【发布时间】:2019-10-28 03:44:58
【问题描述】:

我在 IDE/eclipse 中以 local 模式使用 Spark。

我可以看到 Spark UI 为简单的聚合创建了许多作业。为什么?

import org.apache.spark.sql.SparkSession

trait SparkSessionWrapper {

  lazy val spark: SparkSession = {
    SparkSession
      .builder()
      .master("local[2]")
      .appName("Spark Me")
      .getOrCreate()
  }

  spark.sparkContext.setLogLevel("WARN")

} 

Spark应用如下:

object RowNumberCalc
  extends App
  with SparkSessionWrapper {

  import spark.implicits._

  val cityDf = Seq(
    ("London", "Harish",5500,"2019-10-01"),
    ("NYC","RAJA",11121,"2019-10-01"),
    ("SFO","BABU",77000,"2019-10-01"),
    ("London","Rick",7500,"2019-09-01"),
    ("NYC","Jenna",6511,"2019-09-01"),
    ("SFO","Richard",234567,"2019-09-01"),
    ("London","Harish",999999,"2019-08-01"),
    ("NYC","Sam",1234,"2019-08-01"),
    ("SFO","Dylan",45678,"2019-08-01")).toDF("city","name","money","month_id")

  cityDf.createOrReplaceTempView("city_table")
  val totalMoneySql =
    """
      |select city, sum(money) from city_table group by 1 """.stripMargin
  spark.sql(totalMoneySql).show(false)


  System.in.read
  spark.stop()

}

如图所示,每个城市的资金总额的简单计算 现在 SPARK-UI 显示 ==> 5 个工作,每个工作有 2 个阶段 !!!

SQL标签也显示了5个作业。

物理计划显示正确的阶段划分

== Physical Plan ==
CollectLimit 21
+- *(2) LocalLimit 21
   +- *(2) HashAggregate(keys=[city#9], functions=[sum(cast(money#11 as bigint))], output=[city#9, sum(money)#24])
      +- Exchange hashpartitioning(city#9, 200)
         +- *(1) HashAggregate(keys=[city#9], functions=[partial_sum(cast(money#11 as bigint))], output=[city#9, sum#29L])
            +- LocalTableScan [city#9, money#11]

从哪里/如何触发 5 个工作 ???

【问题讨论】:

    标签: apache-spark apache-spark-sql


    【解决方案1】:

    tl;dr 在默认的 200 个分区和 5 个 Spark 作业中,您可以使用很少的行(9 个作为主要输入和 3 个聚合)以满足 @ 的要求987654321@ 显示 20 行。


    换句话说,您所体验的是Dataset.show-specific(顺便说一句,这不适用于大型数据集,不是吗?)

    默认情况下Dataset.show 显示 20 行。它从 1 个分区开始,最多占用 20 行。如果没有足够的行,它将乘以 4(如果我没记错的话)并扫描其他 4 个分区以查找丢失的行。直到收集到 20 行为止。

    最后HashAggregate的输出行数为3行。

    根据这 3 行在 Spark 中的分区,可以运行一个、两个或多个作业。它在很大程度上取决于行的哈希值(每个 HashPartitioner)。


    如果您真的想查看此行数(输入为 9)的单个 Spark 作业,请使用 spark.sql.shuffle.partitions 配置属性作为 1 启动 Spark 应用程序。

    这将使聚合后的计算使用 1 个分区,并将所有结果行放在一个分区中。

    【讨论】:

    • 一个具体而琐碎的用例。我不得不说阅读 DAG 很难。在这种情况下,跳过的阶段也令人困惑。无论如何,我们现在都知道了。
    猜你喜欢
    • 2015-12-21
    • 1970-01-01
    • 2016-10-11
    • 1970-01-01
    • 1970-01-01
    • 2020-10-18
    • 2018-06-05
    • 1970-01-01
    • 2019-02-27
    相关资源
    最近更新 更多