一、分区的作用

RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存到不同的节点上。

①、对RDD分区,第一个功能是增加并行度

例如,1个RDD有n个分区,分布在n个不同工作节点(WorkerNode)上面,这n个工作节点分别启动n个线程对这n个分区的数据进行并行处理,从而增加了任务的并行度。(1个工作节点上面有几个分区,启动几个线程)

②、对RDD分区,第二个功能是减少通信开销

连接(Join)是查询分析中经常发生的一种操作。

假设在某种应用中需要对两个表进行连接操作,第一个表是很大的用户信息表UserData(UserId, UserInfo),其中,UserId和UserInfo是用户信息表的两个字段,UserInfo包含了某个用户所定阅的主题信息;第二个表是比较小的Events(UserId, LinkInfo),只记录了过去五分钟内发生的事件,即某个用户访问查看了哪些链接。为了对用户访问情况进行了解,需要周期性地对UserData和Events这两个表进行连接操作,并获得(UserId, UserInfo, LinkInfo)这种形式的效果,从而知道某用户订阅的是哪个主题,以及访问了哪个链接。

在执行Spark作业时,UserData表会被加载到内存中生成RDD(即userData),RDD中的每个元素是<UserId, UserInfo>这种类型的键值对;同理,Events表会被加载到内存中生成RDD(即events),RDD中的每个元素是<UserId, LinkInfo>这种类型的键值对。

{

由于UserData是一个很大的表,通常会被存放到HDFS文件中,Spark系统会根据每个元素的数据来源,把每个RDD元素放在相应的节点上。例如:从工作节点u1上的HDFS文件块中读取到的记录,其生成的RDD元素(<UserId, LinkInfo>键值对)就会被放在节点u1上面,最终,UserData这个RDD的元素就会分布在节点u1、u2········um上。

然后,进行连接操作userData.join(events)得到连接结果。如下图:在默认情况下,连接操作会将两个数据集中的所有key的哈希值都求出来,将哈希值相同的记录传送到同一台机器上,之后在该机器上对所有key相同的记录进行连接操作。

例如,对于userData这个RDD而言,它在节点u1上的所有RDD元素,根据key的值进行哈希,再根据哈希值分发到j1、j2······jk这些节点上;同理,u2、u3······um都要这样操作。对于events这个RDD而言,也需要执行同样的操作。可以看出,在这种情况下,每次进行连接操作都会有混洗的问题,造成了很大的网络开销。

RDD分区

此时,深灰色的(u1、u2······um)和(j1、j2······jk)以及(e1、e2······en)的区域表示的是工作节点。 

}

{

由于userData这个RDD要比events大很多,所以,可以选择对userData进行分区。

例如,采用哈希分区方法,把userData这个RDD分成m个分区,这些分区分布在节点u1、u2······um上。对userData进行分区后,再执行连接操作。因为已经对userData进行了分区,则只需要对events这个RDD的每个元素求哈希值(采用和userData同样的哈希函数),然后,根据哈希值把每个events中的RDD元素分发到对于的节点u1、u2······um上面。

在整个过程中,只有events发生了数据混洗,产生网络通信,而userData的数据都是本地引用,不会产生网络传输开销。

先把大的表分区,然后再进行连接操作

RDD分区

}

二、分区的原则

RDD分区的原则是:使分区的个数尽量等于集群中的CPU核心数目。(目的:可以实现并行

对于不同的Spark部署模式而言(Local模式、Standalone模式、YARN模式、Mesos模式),都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目。各种模式下的默认分区数目如下:

Local模式 默认为本地机器的CPU数目,若设置了local[N],则默认分区数为N
Standalone模式、YARN模式 在“集群中所有CPU核心数目总和”和“2”这二者中取较大值作为默认值
Mesos模式 默认分区数为8

三、设置分区的个数

可以手动设置分区的数量,主要包括两种方式:

①、创建RDD时手动指定分区个数

②、使用repartition方法重新设置分区个数

 1、创建RDD时手动指定分区个数

在调用textFile()和parallelize()方法的时候手动指定分区的个数。

①、sc.textFile(path, partitionNum)。其中,path参数为指定要加载的文件的地址,partitionNum参数为指定分区的个数。

对于textFile()方法而言,如果没有在方法中指定分区数,则默认为min(defaultParallelism, 2),其中,defaultParallelism对应的就是spark.default.parallelism。

②、对于parallelize()方法而言,如果没有在方法中指定分区数,则默认为spark.default.parallelism。(由部署模式决定分区数)

       RDD分区

 2、使用repartition方法重新设置分区个数

     RDD分区

 四、自定义分区方法

Spark提供了自带的哈希分区和区域分区。Spark也支持自定义分区方式,即通过提供一个自定义的Partitioner对象来控制RDD的分区方式。

Spark的分区函数针对的是<key, value>类型的RDD,分区函数根据key对RDD元素进行分区。对于一些非<key, value>类型的RDD进行自定义分区时,首先需要把RDD元素转换为<key, value>类。

要实现自定义分区,需要定义一个类,这个自定义类需要继承org.apache.spark.Partitioner类,并实现下面3个方法:

numPartitions: Int 返回创建出来的分区数
getPartition(Key, Any): Int 返回给定键的分区编号(0到numPartitions-1)
equals(): Java判断相等性的标准方法

实例:要求根据key值的最后一位数字写到不同的文件中。例如,10写入到part-00000,11写入到part-00001。

RDD分区

RDD分区 

其中,data.map(_, 1)表示把data中的每个Int类型的元素取出来,转换成(key, value>类型),比如,1元素取出来变成(1, 1),2元素取出来变成(2, 1),3元素取出来变成(3, 1)······················

partitionBy(new  MyPartitioner(10))表示调用自定义分区函数,把(1, 1)、(2, 1)、(3, 1)、······、(10, 1)这些RDD元素根据尾号分成10个分区。

map(_._1)表示把(1, 1)、(2, 1)、(3, 1)、······、(10, 1)等(key, value)类型元素的key提取出来,得到1,2,3,···,10(已分区)。如果map(_._2)就是取出来value的值。

saveAsTextFile()方法把RDD的10个Int类型的元素写入到本地文件中。

相关文章:

  • 2021-03-28
  • 2022-12-23
  • 2021-05-27
  • 2021-12-08
  • 2021-06-22
  • 2021-12-13
  • 2022-12-23
  • 2021-11-26
猜你喜欢
  • 2021-04-28
  • 2021-08-19
  • 2021-08-02
  • 2021-07-28
  • 2021-12-21
  • 2022-12-23
相关资源
相似解决方案