【问题标题】:Why did Spark processed the same data twice?为什么 Spark 两次处理相同的数据?
【发布时间】:2018-06-28 13:04:35
【问题描述】:

当最简单的 spark 应用程序似乎两次完成了相同的工作时,我陷入了一种奇怪的境地。

我做了什么

应用程序本身执行查询:

SELECT date, field1, field2, ..., field10
FROM table1
WHERE field1 = <some number>
  AND date BETWEEN date('2018-05-01') AND date('2018-05-30')
  ORDER BY 1

并将结果存储到 HDFS 中。

table1是一堆存储在HDFS上的parquet文件,分区如下

/root/date=2018-05-01/hour=0/data-1.snappy.parquet
/root/date=2018-05-01/hour=0/data-2.snappy.parquet
...
/root/date=2018-05-01/hour=1/data-1.snappy.parquet
...
/root/date=2018-05-02/hour=0/data-1.snappy.parquet
...
etc.

所有 parquet 文件的大小从 700M 到 2G,并且具有相同的架构:intbigint 类型的 10 个非空字段。

应用程序的结果很小——只有几千行。

我的 spark 应用程序在 YARN 上以集群模式运行。基础火花参数为

spark.driver.memory=2g
spark.executor.memory=4g
spark.executor.cores=4
spark.dynamicAllocation.enabled=true
spark.shuffle.service.enabled=true
spark.submit.deployMode=cluster

在执行期间,几个容器被抢占,没有发生错误和失败。整个应用程序一次完成。

奇怪的事情

Spark UI 的屏幕截图:

可以看出,第 2 阶段和第 4 阶段都处理了相同数量的输入行,但第 4 阶段也进行了一些洗牌(那些是结果行)。失败的任务是那些容器被抢占的任务。

所以看起来我的应用程序处理了两次相同的文件。

我不知道这怎么可能以及发生了什么。请帮助我理解为什么 Spark 会做如此奇怪的事情。

实际的物理计划:

== Physical Plan ==
Execute InsertIntoHadoopFsRelationCommand InsertIntoHadoopFsRelationCommand hdfs://hadoop/root/tmp/1530123240802-PrQXaOjPoDqCBhfadgrXBiTtfvFrQRlB, false, CSV, Map(path -> /root/tmp/1530123240802-PrQXaOjPoDqCBhfadgrXBiTtfvFrQRlB), Overwrite, [date#10, field1#1L, field0#0L, field3#3L, field2#2L, field5#5, field4#4, field6#6L, field7#7]
+- Coalesce 16
   +- *(2) Sort [date#10 ASC NULLS FIRST], true, 0
      +- Exchange rangepartitioning(date#10 ASC NULLS FIRST, 200)
         +- *(1) Project [date#10, field1#1L, field0#0L, field3#3L, field2#2L, field5#5, field4#4, field6#6L, field7#7]
            +- *(1) Filter (isnotnull(field1#1L) && (field1#1L = 1234567890))
               +- *(1) FileScan parquet default.table1[field0#0L,field1#1L,field2#2L,field3#3L,field4#4,field5#5,field6#6L,field7#7,date#10,hour#11] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hadoop/table1], PartitionCount: 714, PartitionFilters: [(date#10 >= 17652), (date#10 <= 17682)], PushedFilters: [IsNotNull(field1), EqualTo(field1,1234567890)], ReadSchema: struct<field0:bigint,field1:bigint,field2:bigint,field3:bigint,field4:int,field5:int,field6:bigint,field7:...

以下是第 2 阶段和第 4 阶段的 DAG:

【问题讨论】:

  • 你能把计划包括进来吗? (使用df.explain()
  • 当然。添加到我冗长的问题的底部。
  • 你能分享 DAG 图吗
  • 附上几张照片。是否可以在一张图片中获取包含所有阶段的整个应用程序的 DAG?
  • @MaxZavyrylin Spark 启动一项工作以执行模式推断,但它仅针对数据样本。只是为了消除这个原因,我们可以使用spark.sql.hive.caseSensitiveInferenceMode=false 运行这项工作

标签: apache-spark


【解决方案1】:

我遇到了同样的问题,结果证明这种行为是完全正常的。

我在一个 Spark 作业中观察到这种行为,该作业只是从 HDFS 读取数据,进行一些轻量级处理,并在写回 HDFS 之前使用orderBy 方法对列进行排序。在 Spark UI 中,我看到两个作业将扫描整个 6 TB 表,就像您所做的那样。第一个作业使用的内存很少,没有写入任何 shuffle 记录,也没有向 HDFS 写入任何记录。

事实证明,根本原因是在实际对数据进行排序之前,Spark 执行了一个采样操作,帮助它定义了一个RangePartitioner,它用于为排序算法对数据进行分区:它需要知道定义排序键的列中数据的大致范围来定义一个好的RangePartitioner

此操作在此博客中有所提及:

https://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/

这篇 StackOverflow 帖子:

How does Spark achieve sort order?

还有霍顿·卡劳 (Holden Karau) 和雷切尔·沃伦 (Rachel Warran) 所著的伟大著作《高性能火花》(High Performance Spark),第 1 页。 143.

就我而言,我知道键的范围,所以我想到原则上我应该能够定义一个RangePartitioner先验。但是,我在 Spark 源代码中挖掘了它的 sort 方法,但没有找到任何可以显式传递范围的解决方法。

【讨论】:

  • 上面的 DAG 显示的是数据被完全读取了一遍,而不是对数据的百分比进行采样。这是预期的吗?另外,我认为采样发生在单独的工作中,而不是单独的阶段。如果我错了,请纠正我
  • @DaRkMaN 这是正确的和预期的:RangePartitioner 正在通过 Partitioner.scala 中的 sketch 方法实现储层采样。为了生成真正随机的样本,必须读取所有元素。想象一下,如果您有一列包含 100 个从 1 到 100 的整数,您想要对已经按升序排序的数据进行排序,并且只读取该数据的前 10%。您只会读取一些最小值 (1-11),因此您估计的“范围”不正确。
【解决方案2】:

我仍然不确定为什么 spark 会这样,我仍在挖掘,但我已经设法了解 发生了什么

注意:我的 SQL 以 ORDER 结尾。由于预计该作业将返回很少的行,因此我认为最终排序应该是一件容易的事。

因此,当我删除 ORDER 子句时,我的查询会按预期运行并且只读取一次镶木地板。无论数据集有多大以及在执行过程中任务被抢占多少次,这种奇怪的行为都是可重现的:添加 order 子句会导致 spark 扫描整个数据集两次(至少看起来像这样) .

我忘了提一下:我使用的是 Hortonworks 发行版 (HDP-2.6.5) 中的 spark 2.3.0。

【讨论】:

    猜你喜欢
    • 2021-08-03
    • 2015-12-09
    • 1970-01-01
    • 2022-01-26
    • 2015-12-09
    • 2014-10-09
    • 1970-01-01
    相关资源
    最近更新 更多