第一阶段 = 读取文件。由于重新分区(因为它是需要改组的广泛转换),它不能加入到带有 partial_count 的单个阶段(第二阶段)
第二阶段 = 本地计数(计算每个分区的计数)
第三阶段 = 在驱动程序上聚合结果。
Spark 生成每个动作或广泛转换的单独阶段。要了解有关窄/宽转换以及为什么宽转换需要单独阶段的更多详细信息,请查看 "Wide Versus Narrow Dependencies, High Performance Spark, Holden Karau" 或 this article。
让我们在本地测试这个假设。首先你需要创建一个数据集:
数据集/test-data.json
[
{ "key": 1, "value": "a" },
{ "key": 2, "value": "b" },
{ "key": 3, "value": "c" },
{ "key": 4, "value": "d" },
{ "key": 5, "value": "e" },
{ "key": 6, "value": "f" },
{ "key": 7, "value": "g" },
{ "key": 8, "value": "h" }
]
然后运行以下代码:
StructType schema = new StructType()
.add("key", DataTypes.IntegerType)
.add("value", DataTypes.StringType);
SparkSession session = SparkSession.builder()
.appName("sandbox")
.master("local[*]")
.getOrCreate();
session
.read()
.schema(schema)
.json("file:///C:/<you_path>/dataset")
.repartition(4) // comment on the second run
.registerTempTable("df");
session.sqlContext().sql("SELECT COUNT(*) FROM df").explain();
输出将是:
== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
+- *(2) HashAggregate(keys=[], functions=[partial_count(1)])
+- Exchange RoundRobinPartitioning(4)
+- *(1) FileScan json [] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/iaroslav/IdeaProjects/sparksandbox/src/main/resources/dataset], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>
但如果您注释/删除 .repartition(4) 字符串,请注意 TableScan 和 partial_count 是在单个阶段内完成的,输出将如下:
== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)])
+- *(1) FileScan json [] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/iaroslav/IdeaProjects/sparksandbox/src/main/resources/dataset], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>
附:请注意,额外的阶段可能会对性能产生重大影响,因为它需要磁盘 I/O(看看here)并且是某种影响并行化的同步障碍,这意味着在大多数情况下,Spark 不会启动阶段 2 直到阶段1 完成。尽管如此,如果repartition 提高并行度,它可能还是值得的。