【问题标题】:Why spark count action has executed in three stages为什么火花计数行动分三个阶段执行
【发布时间】:2019-11-07 02:26:29
【问题描述】:

我已经加载了一个 csv 文件。将其重新分区为 4,然后对 DataFrame 进行计数。当我查看 DAG 时,我看到此操作分 3 个阶段执行。

为什么这个简单的动作要分 3 个阶段执行。我想第一阶段是加载文件,第二阶段是找到每个分区的计数。

那么第三阶段发生了什么?

这是我的代码

val sample = spark.read.format("csv").option("header", "true").option("inferSchema", "true").option("delimiter", ";").load("sample_data.csv")

sample.repartition(4).count()

【问题讨论】:

  • inferSchema 设置为 true 会导致 Spark 对数据进行一次额外的传递以发现架构。
  • @HristoIliev 即使没有 inferSchema 配置,您也将获得 3 个阶段。请检查我的答案
  • @VB_ 我明白了。很好的解释。

标签: apache-spark apache-spark-sql


【解决方案1】:
  1. 第一阶段 = 读取文件。由于重新分区(因为它是需要改组的广泛转换),它不能加入到带有 partial_count 的单个阶段(第二阶段)

  2. 第二阶段 = 本地计数(计算每个分区的计数)

  3. 第三阶段 = 在驱动程序上聚合结果。

Spark 生成每个动作或广泛转换的单独阶段。要了解有关窄/宽转换以及为什么宽转换需要单独阶段的更多详细信息,请查看 "Wide Versus Narrow Dependencies, High Performance Spark, Holden Karau"this article

让我们在本地测试这个假设。首先你需要创建一个数据集:

数据集/test-data.json

[
  { "key":  1, "value":  "a" },
  { "key":  2, "value":  "b" },
  { "key":  3, "value":  "c" },
  { "key":  4, "value":  "d" },
  { "key":  5, "value":  "e" },
  { "key":  6, "value":  "f" },
  { "key":  7, "value":  "g" },
  { "key":  8, "value":  "h" }
]

然后运行以下代码:

    StructType schema = new StructType()
            .add("key", DataTypes.IntegerType)
            .add("value", DataTypes.StringType);

    SparkSession session = SparkSession.builder()
            .appName("sandbox")
            .master("local[*]")
            .getOrCreate();

    session
            .read()
            .schema(schema)
            .json("file:///C:/<you_path>/dataset")
            .repartition(4) // comment on the second run
            .registerTempTable("df");

    session.sqlContext().sql("SELECT COUNT(*) FROM df").explain();

输出将是:

== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *(2) HashAggregate(keys=[], functions=[partial_count(1)])
      +- Exchange RoundRobinPartitioning(4)
         +- *(1) FileScan json [] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/iaroslav/IdeaProjects/sparksandbox/src/main/resources/dataset], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

但如果您注释/删除 .repartition(4) 字符串,请注意 TableScan 和 partial_count 是在单个阶段内完成的,输出将如下:

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *(1) HashAggregate(keys=[], functions=[partial_count(1)])
      +- *(1) FileScan json [] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/iaroslav/IdeaProjects/sparksandbox/src/main/resources/dataset], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

附:请注意,额外的阶段可能会对性能产生重大影响,因为它需要磁盘 I/O(看看here)并且是某种影响并行化的同步障碍,这意味着在大多数情况下,Spark 不会启动阶段 2 直到阶段1 完成。尽管如此,如果repartition 提高并行度,它可能还是值得的。

【讨论】:

  • 感谢您的回答。这说得通。那么这个最终结果聚合任务总是应该在 Driver 上运行吗?如果我在我的 DF 中取一个字段的 max() 怎么办。那么在每个分区上找到最大值后,在驱动处找到最终值?
  • @ѕтƒ 是的,count 将始终将结果拉到驱动程序。我认为max 可能会这样做,否则它如何比较分区之间的结果。但我认为这里的重点是您不必在这里优化count 操作,因为每个分区只有一个整数被移动到驱动程序。这里最繁重的操作是repartition,如果你最初有超过4个分区,看看coalesce或者根本不重新分区(如果分区数量合理的话)。假设你有 2 个分区,也许 count 没有重新分区可能比 repartition(4).count() 快。
  • @ѕтƒ 如果谈到进一步的优化(假设您可以控制 CSV 文件的创建),您可以查看不同的文件格式(列 Parquet,对于 count 操作,读取单列而不是整个文件)。如果您想使用 CSV,还要检查压缩选项 - 比您需要可拆分的压缩选项。
  • @ѕтƒ 但这只是理论,而在实践中count 操作非常轻量级,不需要那些优化。另一个有趣的点是一些数据格式包含元数据,Spark 也有基于成本优化器的选项。这意味着对于某些格式(不是 CSV)或 CBO,对于 count 操作,Spark 不会读取数据,而只会读取元数据。在这种情况下,count 应该非常快,并且不需要 repartition 恕我直言
  • 感谢宝贵的 cmets。我不是在寻找性能改进。我想了解的是 spark 将如何协调来自不同执行器的结果并将其聚合以显示最终结果(如果它的计数或最大值或其他操作)。我想知道驱动程序从每个分区获得独立结果后,谁将在驱动程序端聚合数据。从您的回答来看,另一个任务将在驱动程序节点上运行。
猜你喜欢
  • 1970-01-01
  • 2017-11-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-07-24
相关资源
最近更新 更多