【发布时间】:2018-06-28 13:04:35
【问题描述】:
当最简单的 spark 应用程序似乎两次完成了相同的工作时,我陷入了一种奇怪的境地。
我做了什么
应用程序本身执行查询:
SELECT date, field1, field2, ..., field10
FROM table1
WHERE field1 = <some number>
AND date BETWEEN date('2018-05-01') AND date('2018-05-30')
ORDER BY 1
并将结果存储到 HDFS 中。
表table1是一堆存储在HDFS上的parquet文件,分区如下
/root/date=2018-05-01/hour=0/data-1.snappy.parquet
/root/date=2018-05-01/hour=0/data-2.snappy.parquet
...
/root/date=2018-05-01/hour=1/data-1.snappy.parquet
...
/root/date=2018-05-02/hour=0/data-1.snappy.parquet
...
etc.
所有 parquet 文件的大小从 700M 到 2G,并且具有相同的架构:int 或 bigint 类型的 10 个非空字段。
应用程序的结果很小——只有几千行。
我的 spark 应用程序在 YARN 上以集群模式运行。基础火花参数为
spark.driver.memory=2g
spark.executor.memory=4g
spark.executor.cores=4
spark.dynamicAllocation.enabled=true
spark.shuffle.service.enabled=true
spark.submit.deployMode=cluster
在执行期间,几个容器被抢占,没有发生错误和失败。整个应用程序一次完成。
奇怪的事情
Spark UI 的屏幕截图:
可以看出,第 2 阶段和第 4 阶段都处理了相同数量的输入行,但第 4 阶段也进行了一些洗牌(那些是结果行)。失败的任务是那些容器被抢占的任务。
所以看起来我的应用程序处理了两次相同的文件。
我不知道这怎么可能以及发生了什么。请帮助我理解为什么 Spark 会做如此奇怪的事情。
实际的物理计划:
== Physical Plan ==
Execute InsertIntoHadoopFsRelationCommand InsertIntoHadoopFsRelationCommand hdfs://hadoop/root/tmp/1530123240802-PrQXaOjPoDqCBhfadgrXBiTtfvFrQRlB, false, CSV, Map(path -> /root/tmp/1530123240802-PrQXaOjPoDqCBhfadgrXBiTtfvFrQRlB), Overwrite, [date#10, field1#1L, field0#0L, field3#3L, field2#2L, field5#5, field4#4, field6#6L, field7#7]
+- Coalesce 16
+- *(2) Sort [date#10 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(date#10 ASC NULLS FIRST, 200)
+- *(1) Project [date#10, field1#1L, field0#0L, field3#3L, field2#2L, field5#5, field4#4, field6#6L, field7#7]
+- *(1) Filter (isnotnull(field1#1L) && (field1#1L = 1234567890))
+- *(1) FileScan parquet default.table1[field0#0L,field1#1L,field2#2L,field3#3L,field4#4,field5#5,field6#6L,field7#7,date#10,hour#11] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hadoop/table1], PartitionCount: 714, PartitionFilters: [(date#10 >= 17652), (date#10 <= 17682)], PushedFilters: [IsNotNull(field1), EqualTo(field1,1234567890)], ReadSchema: struct<field0:bigint,field1:bigint,field2:bigint,field3:bigint,field4:int,field5:int,field6:bigint,field7:...
以下是第 2 阶段和第 4 阶段的 DAG:
【问题讨论】:
-
你能把计划包括进来吗? (使用
df.explain()) -
当然。添加到我冗长的问题的底部。
-
你能分享 DAG 图吗
-
附上几张照片。是否可以在一张图片中获取包含所有阶段的整个应用程序的 DAG?
-
@MaxZavyrylin Spark 启动一项工作以执行模式推断,但它仅针对数据样本。只是为了消除这个原因,我们可以使用
spark.sql.hive.caseSensitiveInferenceMode=false运行这项工作
标签: apache-spark