【问题标题】:Apache Spark : Make a multiple columns groupBy on a Dataset works withtin partitionApache Spark:使数据集上的多列 groupBy 适用于分区
【发布时间】:2021-04-01 15:25:11
【问题描述】:

我逐个城市确定活动部门的员工和企业数量:

|codeCommune|nomCommune          |regroupement|section|libelleAPE                                                                                                                       |nombreEntreprises|nombreSalaries|nombreActifs|
+-----------+--------------------+------------+-------+---------------------------------------------------------------------------------------------------------------------------------+-----------------+--------------+------------+
|14654      |Saint-Pierre-en-Auge|84.11Z      |O      |Administration publique générale                                                                                                 |3                |153.5         |169.5       |
|14654      |Saint-Pierre-en-Auge|16.24Z      |C      |Fabrication d'emballages en bois                                                                                                 |1                |149.5         |150.5       |
|14654      |Saint-Pierre-en-Auge|10.11Z      |C      |Transformation et conservation de la viande de boucherie                                                                         |1                |149.5         |150.5       |

具有由用户设置的分组级别(以下regroupement):

+-----------+--------------------+------------+-------+------------------------------------------------------------------------------------------------------------------------------------------+-----------------+--------------+------------+
|codeCommune|nomCommune          |regroupement|section|libelleAPE                                                                                                                                |nombreEntreprises|nombreSalaries|nombreActifs|
+-----------+--------------------+------------+-------+------------------------------------------------------------------------------------------------------------------------------------------+-----------------+--------------+------------+
|14654      |Saint-Pierre-en-Auge|10          |C      |Industries alimentaires                                                                                                                   |16               |208.0         |225.0       |
|14654      |Saint-Pierre-en-Auge|86          |Q      |Activités pour la santé humaine                                                                                                           |46               |169.5         |218.5       |
|14654      |Saint-Pierre-en-Auge|84          |O      |Administration publique et défense ; sécurité sociale obligatoire                                                                         |5                |153.5         |171.5       |

工作就是这样完成的:

  1. Dataset 的企业和机构中,按部门代码(大约是城市代码的前两个字符)划分,选择这些列:

    • city_code,
    • city_name,
    • grouping(我们保留的活动代码部分:84.11Z84),
    • section(总结活动部门的代码:工业、商业等),
    • activity_description,
    • siren(企业标识符:一个企业可能有很多机构),
    • number_of_workers,
    • number_of_actives_people
  2. groupBy 已完成:

RelationalGroupedDataset group = enterprisesAndEstablishments
   .groupBy("city_code", "city_name", "grouping", "section", "activity_description");
  1. 然后我通过聚合执行计算:
group.agg(countDistinct("siren").as("nombreEntreprises"), 
   sum("number_of_workers").as("nombreSalaries"),
   sum("number_of_actives_people").as("nombreActifs"));

我的问题是groupBy 方法不关心数据集分区,而是从数据集enterprisesAndEstablishments 的任何分区收集其数据,并对大量数据进行全局排序,
仅针对一部分时效率更高:此示例中的所有活动都在分区[codeDepartement=14]中。

我希望它尊重这些分区并在它们的级别上执行groupBy,以避免随机播放。

我正在寻找groupBysortWithPartitions 伴侣。可以称为groupWithinPartitions 的东西,但我没找到。

实现我的目标的最佳方法是什么,
或者如果没有工具,我应该选择什么替代方案?

【问题讨论】:

  • 也许您可以查看物理执行计划以进行最终计算。您应该看到 Spark 首先应用了相应的 partial_xxxx 函数(对于分区内的所有 groupby 键),然后对结果进行洗牌以计算最终的每个键聚合。

标签: java apache-spark partitioning


【解决方案1】:

您可以使用 RDD 低级函数 aggregateByKey 来实现相同的目的,它是 Spark 中可用的聚合函数之一(其他是 reduceByKeygroupByKey),一个不同之处在于它是三者中强大的一个.

聚合键不需要对相同的数据类型进行操作,可以在分区内进行不同的聚合(最大值、最小值、平均值、总和和计数),并在分区之间进行不同的聚合。

case class EnterpriseEmp(
    city_code: Long,
    city_name: String,
    grouping: Int,
    section: String,
    activity_description: String,
    siren: String,
    number_of_workers: Long,
    number_of_actives_people: Long
)

val empList =
      Array(
        EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 10, "C", "Industries alimentaires", "A1", 100, 100),
        EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 10, "C", "Industries alimentaires", "A1", 150, 200),
        EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 10, "C", "Industries alimentaires", "A1", 200, 300),
        EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 86, "Q", "Activités pour la santé humaine", "B1", 1000, 1001),
        EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 86, "Q", "Activités pour la santé humaine", "B1", 1050, 2001),
        EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 86, "Q", "Activités pour la santé humaine", "B1", 2000, 3001)
      )
val data = sc.parallelize(empList)
val keyed = data.keyBy(key =>
  (
    key.city_code,
    key.city_name,
    key.grouping,
    key.section,
    key.activity_description
  )
)

aggregateByKey 需要 3 个主要输入:

  1. zeroValue:初始值,不会影响聚合值。
  2. Combiner 函数:此函数接受两个参数。第二个参数合并到第一个参数中。此函数合并/合并单个分区中的值。
  3. Reduce/Merge 函数:此函数还接受两个参数。这里的参数跨 RDD 分区合并为一个。
val init_value = (0L, 0L, 0L) //sum("number_of_workers"), sum("number_of_actives_people"), count("siren")
val combinerFunc = (inter: (Long, Long, Long), value: EnterpriseEmp) => {
  (
    inter._1 + value.number_of_workers,
    inter._2 + value.number_of_actives_people,
    inter._3 + 1
  )
}
val reduceFunc = (p1: (Long, Long, Long), p2: (Long, Long, Long)) => {
  (p1._1 + p2._1, p1._2 + p2._2, p1._3 + p2._3)
}
val output = keyed.aggregateByKey(init_value)(combinerFunc, reduceFunc)

输出:

output.collect.foreach(println)
((14654,Saint-Pierre-en-Auge,86,Q,Activités pour la santé humaine),(4050,6003,3))
((14654,Saint-Pierre-en-Auge,10,C,Industries alimentaires),(450,600,3))

【讨论】:

  • 看起来很有趣,我会尽快尝试。奇怪的是,RDD 中的一些有用功能无法从数据集或数据帧中轻松访问。这不是唯一一个。
  • keyBy 也是类似 map() MapPartitionsRDD[1] at keyBy 的狭义变换。这与 Hadoop MR 的行为方式类似,并且可以进行不同的计数。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-02-01
  • 2017-04-01
  • 1970-01-01
  • 2018-09-17
  • 1970-01-01
  • 2018-07-15
相关资源
最近更新 更多