【发布时间】:2021-04-01 15:25:11
【问题描述】:
我逐个城市确定活动部门的员工和企业数量:
|codeCommune|nomCommune |regroupement|section|libelleAPE |nombreEntreprises|nombreSalaries|nombreActifs|
+-----------+--------------------+------------+-------+---------------------------------------------------------------------------------------------------------------------------------+-----------------+--------------+------------+
|14654 |Saint-Pierre-en-Auge|84.11Z |O |Administration publique générale |3 |153.5 |169.5 |
|14654 |Saint-Pierre-en-Auge|16.24Z |C |Fabrication d'emballages en bois |1 |149.5 |150.5 |
|14654 |Saint-Pierre-en-Auge|10.11Z |C |Transformation et conservation de la viande de boucherie |1 |149.5 |150.5 |
具有由用户设置的分组级别(以下regroupement):
+-----------+--------------------+------------+-------+------------------------------------------------------------------------------------------------------------------------------------------+-----------------+--------------+------------+
|codeCommune|nomCommune |regroupement|section|libelleAPE |nombreEntreprises|nombreSalaries|nombreActifs|
+-----------+--------------------+------------+-------+------------------------------------------------------------------------------------------------------------------------------------------+-----------------+--------------+------------+
|14654 |Saint-Pierre-en-Auge|10 |C |Industries alimentaires |16 |208.0 |225.0 |
|14654 |Saint-Pierre-en-Auge|86 |Q |Activités pour la santé humaine |46 |169.5 |218.5 |
|14654 |Saint-Pierre-en-Auge|84 |O |Administration publique et défense ; sécurité sociale obligatoire |5 |153.5 |171.5 |
工作就是这样完成的:
-
从
Dataset的企业和机构中,按部门代码(大约是城市代码的前两个字符)划分,选择这些列:-
city_code, -
city_name, -
grouping(我们保留的活动代码部分:84.11Z或84), -
section(总结活动部门的代码:工业、商业等), -
activity_description, -
siren(企业标识符:一个企业可能有很多机构), -
number_of_workers, number_of_actives_people
-
-
groupBy已完成:
RelationalGroupedDataset group = enterprisesAndEstablishments
.groupBy("city_code", "city_name", "grouping", "section", "activity_description");
- 然后我通过聚合执行计算:
group.agg(countDistinct("siren").as("nombreEntreprises"),
sum("number_of_workers").as("nombreSalaries"),
sum("number_of_actives_people").as("nombreActifs"));
我的问题是groupBy 方法不关心数据集分区,而是从数据集enterprisesAndEstablishments 的任何分区收集其数据,并对大量数据进行全局排序,
仅针对一部分时效率更高:此示例中的所有活动都在分区[codeDepartement=14]中。
我希望它尊重这些分区并在它们的级别上执行groupBy,以避免随机播放。
我正在寻找groupBy 的sortWithPartitions 伴侣。可以称为groupWithinPartitions 的东西,但我没找到。
实现我的目标的最佳方法是什么,
或者如果没有工具,我应该选择什么替代方案?
【问题讨论】:
-
也许您可以查看物理执行计划以进行最终计算。您应该看到 Spark 首先应用了相应的
partial_xxxx函数(对于分区内的所有groupby键),然后对结果进行洗牌以计算最终的每个键聚合。
标签: java apache-spark partitioning