分区的意义

Spark RDD 是一种分布式的数据集,由于数据量很大,因此它被切分成不同分区并存储在各个Worker节点的内存中。从而当我们对RDD进行操作时,实际上是对每个分区中的数据并行操作。Spark根据字段进行partition类似于关系型数据库中的分区,可以加大并行度,提高执行效率。Spark从HDFS读入文件的分区数默认等于HDFS文件的块数(blocks),HDFS中的block是分布式存储的最小单元。 

Spark PartitionSpark Partition

Spark Partition     Spark Partition

1. RDD repartition和partitionBy的区别

spark中RDD两个常用的重分区算子,repartition 和 partitionBy 都是对数据进行重新分区,默认都是使用 HashPartitioner,区别在于partitionBy 只能用于 PairRdd(key-value类型的数据),但是当它们同时都用于 PairRdd时,效果也是不一样的。reparation的分区比较的随意,没有什么规律,而partitionBy把相同的key都分到了同一个分区。

val parRDD = pairRDD.repartition(10) //重分区为10;
val parRDD = pairRDD.partitionBy(new HashPartitioner(10)) //重分区为10;

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD 

object PartitionDemo {
 
  Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("localTest").setMaster("local[4]")
    val sc = new SparkContext(conf)
    
    val rdd = sc.parallelize(List("hello", "jason", "what", "are", "you", "doing","hi","jason","do","you","eat","dinner",
            "hello","jason","do","you","have","some","time","hello","jason","time","do","you","jason","jason"),4) //设置4个分区;
    val word_count = rdd.flatMap(_.split(",")).map((_,1)) 
    val repar = word_count.repartition(10)                       //重分区为10;
    val parby = word_count.partitionBy(new HashPartitioner(10))  //重分区为10;
    print(repar)
    print(parby)
  }
  
  def print(rdd : RDD[(String, Int)]) = {
    rdd.foreachPartition(pair=>{
      println("partion " + TaskContext.get.partitionId + ":") 
      pair.foreach(p=>{ println("  " + p) })
    })
    println
  }
}
View Code

相关文章: