一、分区的作用
RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存到不同的节点上。
①、对RDD分区,第一个功能是增加并行度。
例如,1个RDD有n个分区,分布在n个不同工作节点(WorkerNode)上面,这n个工作节点分别启动n个线程对这n个分区的数据进行并行处理,从而增加了任务的并行度。(1个工作节点上面有几个分区,启动几个线程)
②、对RDD分区,第二个功能是减少通信开销。
连接(Join)是查询分析中经常发生的一种操作。
假设在某种应用中需要对两个表进行连接操作,第一个表是很大的用户信息表UserData(UserId, UserInfo),其中,UserId和UserInfo是用户信息表的两个字段,UserInfo包含了某个用户所定阅的主题信息;第二个表是比较小的Events(UserId, LinkInfo),只记录了过去五分钟内发生的事件,即某个用户访问查看了哪些链接。为了对用户访问情况进行了解,需要周期性地对UserData和Events这两个表进行连接操作,并获得(UserId, UserInfo, LinkInfo)这种形式的效果,从而知道某用户订阅的是哪个主题,以及访问了哪个链接。
在执行Spark作业时,UserData表会被加载到内存中生成RDD(即userData),RDD中的每个元素是<UserId, UserInfo>这种类型的键值对;同理,Events表会被加载到内存中生成RDD(即events),RDD中的每个元素是<UserId, LinkInfo>这种类型的键值对。
{
由于UserData是一个很大的表,通常会被存放到HDFS文件中,Spark系统会根据每个元素的数据来源,把每个RDD元素放在相应的节点上。例如:从工作节点u1上的HDFS文件块中读取到的记录,其生成的RDD元素(<UserId, LinkInfo>键值对)就会被放在节点u1上面,最终,UserData这个RDD的元素就会分布在节点u1、u2········um上。
然后,进行连接操作userData.join(events)得到连接结果。如下图:在默认情况下,连接操作会将两个数据集中的所有key的哈希值都求出来,将哈希值相同的记录传送到同一台机器上,之后在该机器上对所有key相同的记录进行连接操作。
例如,对于userData这个RDD而言,它在节点u1上的所有RDD元素,根据key的值进行哈希,再根据哈希值分发到j1、j2······jk这些节点上;同理,u2、u3······um都要这样操作。对于events这个RDD而言,也需要执行同样的操作。可以看出,在这种情况下,每次进行连接操作都会有混洗的问题,造成了很大的网络开销。
此时,深灰色的(u1、u2······um)和(j1、j2······jk)以及(e1、e2······en)的区域表示的是工作节点。
}
{
由于userData这个RDD要比events大很多,所以,可以选择对userData进行分区。
例如,采用哈希分区方法,把userData这个RDD分成m个分区,这些分区分布在节点u1、u2······um上。对userData进行分区后,再执行连接操作。因为已经对userData进行了分区,则只需要对events这个RDD的每个元素求哈希值(采用和userData同样的哈希函数),然后,根据哈希值把每个events中的RDD元素分发到对于的节点u1、u2······um上面。
在整个过程中,只有events发生了数据混洗,产生网络通信,而userData的数据都是本地引用,不会产生网络传输开销。
(先把大的表分区,然后再进行连接操作)
}
二、分区的原则
RDD分区的原则是:使分区的个数尽量等于集群中的CPU核心数目。(目的:可以实现并行)
对于不同的Spark部署模式而言(Local模式、Standalone模式、YARN模式、Mesos模式),都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目。各种模式下的默认分区数目如下:
| Local模式 | 默认为本地机器的CPU数目,若设置了local[N],则默认分区数为N |
| Standalone模式、YARN模式 | 在“集群中所有CPU核心数目总和”和“2”这二者中取较大值作为默认值 |
| Mesos模式 | 默认分区数为8 |
三、设置分区的个数
可以手动设置分区的数量,主要包括两种方式:
①、创建RDD时手动指定分区个数
②、使用repartition方法重新设置分区个数
1、创建RDD时手动指定分区个数
在调用textFile()和parallelize()方法的时候手动指定分区的个数。
①、sc.textFile(path, partitionNum)。其中,path参数为指定要加载的文件的地址,partitionNum参数为指定分区的个数。
对于textFile()方法而言,如果没有在方法中指定分区数,则默认为min(defaultParallelism, 2),其中,defaultParallelism对应的就是spark.default.parallelism。
②、对于parallelize()方法而言,如果没有在方法中指定分区数,则默认为spark.default.parallelism。(由部署模式决定分区数)
2、使用repartition方法重新设置分区个数
四、自定义分区方法
Spark提供了自带的哈希分区和区域分区。Spark也支持自定义分区方式,即通过提供一个自定义的Partitioner对象来控制RDD的分区方式。
Spark的分区函数针对的是<key, value>类型的RDD,分区函数根据key对RDD元素进行分区。对于一些非<key, value>类型的RDD进行自定义分区时,首先需要把RDD元素转换为<key, value>类。
要实现自定义分区,需要定义一个类,这个自定义类需要继承org.apache.spark.Partitioner类,并实现下面3个方法:
| numPartitions: Int | 返回创建出来的分区数 |
| getPartition(Key, Any): Int | 返回给定键的分区编号(0到numPartitions-1) |
| equals(): | Java判断相等性的标准方法 |
实例:要求根据key值的最后一位数字写到不同的文件中。例如,10写入到part-00000,11写入到part-00001。
其中,data.map(_, 1)表示把data中的每个Int类型的元素取出来,转换成(key, value>类型),比如,1元素取出来变成(1, 1),2元素取出来变成(2, 1),3元素取出来变成(3, 1)······················
partitionBy(new MyPartitioner(10))表示调用自定义分区函数,把(1, 1)、(2, 1)、(3, 1)、······、(10, 1)这些RDD元素根据尾号分成10个分区。
map(_._1)表示把(1, 1)、(2, 1)、(3, 1)、······、(10, 1)等(key, value)类型元素的key提取出来,得到1,2,3,···,10(已分区)。如果map(_._2)就是取出来value的值。
saveAsTextFile()方法把RDD的10个Int类型的元素写入到本地文件中。