【发布时间】:2019-10-28 03:44:58
【问题描述】:
我在 IDE/eclipse 中以 local 模式使用 Spark。
我可以看到 Spark UI 为简单的聚合创建了许多作业。为什么?
import org.apache.spark.sql.SparkSession
trait SparkSessionWrapper {
lazy val spark: SparkSession = {
SparkSession
.builder()
.master("local[2]")
.appName("Spark Me")
.getOrCreate()
}
spark.sparkContext.setLogLevel("WARN")
}
Spark应用如下:
object RowNumberCalc
extends App
with SparkSessionWrapper {
import spark.implicits._
val cityDf = Seq(
("London", "Harish",5500,"2019-10-01"),
("NYC","RAJA",11121,"2019-10-01"),
("SFO","BABU",77000,"2019-10-01"),
("London","Rick",7500,"2019-09-01"),
("NYC","Jenna",6511,"2019-09-01"),
("SFO","Richard",234567,"2019-09-01"),
("London","Harish",999999,"2019-08-01"),
("NYC","Sam",1234,"2019-08-01"),
("SFO","Dylan",45678,"2019-08-01")).toDF("city","name","money","month_id")
cityDf.createOrReplaceTempView("city_table")
val totalMoneySql =
"""
|select city, sum(money) from city_table group by 1 """.stripMargin
spark.sql(totalMoneySql).show(false)
System.in.read
spark.stop()
}
如图所示,每个城市的资金总额的简单计算 现在 SPARK-UI 显示 ==> 5 个工作,每个工作有 2 个阶段 !!!
而SQL标签也显示了5个作业。
但物理计划显示正确的阶段划分
== Physical Plan ==
CollectLimit 21
+- *(2) LocalLimit 21
+- *(2) HashAggregate(keys=[city#9], functions=[sum(cast(money#11 as bigint))], output=[city#9, sum(money)#24])
+- Exchange hashpartitioning(city#9, 200)
+- *(1) HashAggregate(keys=[city#9], functions=[partial_sum(cast(money#11 as bigint))], output=[city#9, sum#29L])
+- LocalTableScan [city#9, money#11]
从哪里/如何触发 5 个工作 ???
【问题讨论】:
标签: apache-spark apache-spark-sql