分区的意义
Spark RDD 是一种分布式的数据集,由于数据量很大,因此它被切分成不同分区并存储在各个Worker节点的内存中。从而当我们对RDD进行操作时,实际上是对每个分区中的数据并行操作。Spark根据字段进行partition类似于关系型数据库中的分区,可以加大并行度,提高执行效率。Spark从HDFS读入文件的分区数默认等于HDFS文件的块数(blocks),HDFS中的block是分布式存储的最小单元。
1. RDD repartition和partitionBy的区别
spark中RDD两个常用的重分区算子,repartition 和 partitionBy 都是对数据进行重新分区,默认都是使用 HashPartitioner,区别在于partitionBy 只能用于 PairRdd(key-value类型的数据),但是当它们同时都用于 PairRdd时,效果也是不一样的。reparation的分区比较的随意,没有什么规律,而partitionBy把相同的key都分到了同一个分区。
val parRDD = pairRDD.repartition(10) //重分区为10;
val parRDD = pairRDD.partitionBy(new HashPartitioner(10)) //重分区为10;
import org.apache.log4j.{Level, Logger} import org.apache.spark.{HashPartitioner, SparkConf, SparkContext, TaskContext} import org.apache.spark.rdd.RDD object PartitionDemo { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("localTest").setMaster("local[4]") val sc = new SparkContext(conf) val rdd = sc.parallelize(List("hello", "jason", "what", "are", "you", "doing","hi","jason","do","you","eat","dinner", "hello","jason","do","you","have","some","time","hello","jason","time","do","you","jason","jason"),4) //设置4个分区; val word_count = rdd.flatMap(_.split(",")).map((_,1)) val repar = word_count.repartition(10) //重分区为10; val parby = word_count.partitionBy(new HashPartitioner(10)) //重分区为10; print(repar) print(parby) } def print(rdd : RDD[(String, Int)]) = { rdd.foreachPartition(pair=>{ println("partion " + TaskContext.get.partitionId + ":") pair.foreach(p=>{ println(" " + p) }) }) println } }