1、什么是RDD? RDD的5大特性。
RDD是spark中的一种抽象,他是弹性分布式数据集.
a) RDD由一系列的partition组成
b) 算子作用在partition上
c) RDD之间具有依赖关系
d) partition提供了最佳计算位置(体现了移动计算不移动数据思想)
e) 分区器作用在K、V格式的RDD上。
哪里体现了弹性、分布式?
弹性:partition可大可小,可多可少。RDD之间具有依赖关系;能做到很好的容错。
分布式:partition分布在集群的不同节点上。
2、怎么理解partition,如何合理的设置partition的数量。
在HDFS存储的数据是以block块的形式进行物理存储。RDD是Spark加载HDFS数据到内存中的抽象,里面不存储数据。RDD是由partion组成的,partion被分布在多个节点上。partion是数据的逻辑存储,实际上不存储数据集。当我们对RDD进行操作时,实际上是对每个分区中的数据并行操作。
Spark从HDFS读入文件的分区数默认等于HDFS文件的块数(blocks),HDFS中的block是分布式存储的最小单元。如果我们上传一个30GB的非压缩的文件到HDFS,HDFS默认的块容量大小128MB,因此该文件在HDFS上会被分为235块(30GB/128MB);Spark读取SparkContext.textFile()读取该文件,默认分区数等于块数即235。
如何设置合理的分区数
1、分区数越多越好吗?
不是的,分区数太多意味着任务数太多,每次调度任务也是很耗时的,所以分区数太多会导致总体耗时增多。
2、分区数太少会有什么影响?
分区数太少的话,会导致一些结点没有分配到任务;另一方面,分区数少则每个分区要处理的数据量就会增大,从而对每个结点的内存要求就会提高;还有分区数不合理,会导致数据倾斜问题。
3、合理的分区数是多少?如何设置?
总核数=executor-cores * num-executor
一般合理的分区数设置为总核数的2~3倍
3、RDD或者partition里面存储数据吗?怎么理解内存计算。
RDD或者partition里面不存储数据,partition只是数据的逻辑存储。实际存储数据的地方在block中。
在RDD依赖关系中,在没有action算子触发之前,RDD之间的依赖只是一种逻辑关系,不会触发真正的计算
当action算子触发后数据进行计算,计算的中间结果存储在内存中。
4、Spark中的hello world (word count)
使用java、scala和spark-shell开发wordCount程序。 spark-shell 其实就是spark的命令行,方便测试,验证程序。
java开发
/**
* 本地测试wordCount
*
* @author yangshaojun
* #date 2019/3/9 10:00
* @version 1.0
*/
public class Demo000_WordCountLocal {
public static void main(String[] args) {
// 编写spark应用程序
// 第一步:创建sparkconf对象,设置spark应用配置信息
// 使用setMaster()设置Spark应用程序要连接的Spark集群的Master节点的url
// 如果设置local则代表在本地执行
SparkConf conf = new SparkConf().setAppName("wordCount").setMaster("local[2]");
// 第二步: 创建JavaSparkContext对象
// 在Spark中SparkContext是spark所有功能的入口,无论使用java、scala还是python编写都必须有一个sparkContext对象.
// 它的作用:初始化spark应用程序所需要的的一些核心组件,包括调度器(DAGSchedule、TaskSchedule)
// 还回去spark Master节点上进行注册等等。
// 但是呢,在spark中编写不同类型的spark应用程序使用的spakContext是不同的,
// 如果使用scala那么就是sparkContext对象
// 如果使用java那么就是JavaSparkContext对象
// 如果开发spark Sql程序那么就是SQLContext或者HiveContext
// 如果是开发Spark Streaming程序,那么就是它独有的SparkContext。
JavaSparkContext sc = new JavaSparkContext(conf);
// 第三步:从数据源(HDFS、本地文件等)加载数据 创建一个RDD
// parkContext中根据文件的类型输入源创建的RDD的方法 是textFile()
// 在java中,创建的普通RDD都叫做JavaRDD
// 在这里呢,RDD有元素的概念,如果是HDFS或者是本地文件创建的RDD,其每个元素相当于文件的一行数据。
JavaRDD<String> linesRDD = sc.textFile("data/sparkcore/wordcount");
// 第四步: 对初始的RDD进行transformation操作
// 将每一行拆分成单个的单词
JavaRDD<String> words = linesRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String line) throws Exception {
String[] str = line.split(",");
return Arrays.asList(line.split(","))
}
});
JavaPairRDD<String, Integer> word = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> result = word.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer int1, Integer int2) throws Exception {
return int1 + int2;
}
});
result.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
System.out.println(stringIntegerTuple2);
}
});
}
}
提交spark程序到集群运行
Note:
1)如果要在spark集群上运行应用程序,需要将本地开发中的 setMaster()删除默认自己连接,同时将本地文件改为 hdfs文件。
2)对Spark应用程序打成jar包,上传到服务器上
3)编写spark-submit脚本、执行spark-submit脚本、提交应用程序到集群执行。
./spark-submit
--class com.netcloud.bigdata.spark_core.basiclearning.demo.Demo000_WordCount
--num-executor 3 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 3 \
/opt/wordcount.jar
Note:在这里我们提交程序的时候没有加 --master spark://192.168.2.100:7077 表示任务的执行是在local模式下执行的。
否则就是在standalone模式下执行。
scala开发
object Demo000_WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("word_count").setMaster("local[2]")
val sc = new SparkContext(conf)
val linesRDD = sc.textFile("data/sparkcore/wordcount.txt")
val wordsRDD = linesRDD.flatMap(line => line.split(","))
val wordTuple = wordsRDD.map(word => (word, 1))
val retRdd = wordTuple.reduceByKey((a: Int, b: Int) => (a + b))
retRdd.foreach(println)
}
}
spark-shell开发(主要用于简单的测试)
深度剖析wordCount
Note:reduceBykey算子产生shuffle,什么是shuffle?shuffle就是数据重新进行分区、落地到磁盘的过程。
5、Spark架构原理
-- Application:用户编写的Spark应用程序。
-- Driver : Driver就是一个进程,我们编写的spark应用程序就在Driver上面;由Driver进程执行。根据任务提交的方式不同Driver所处的节点位置不同;Driver可能处在我们提交spark程序的机器上面,也有可能处在spark集群中的某个节点之上。
-- Master: 它是Spark集群的主进程;负责负责资源的调度和分配、和集群的监控。
-- Worker: 它是Spark集群的进程;通过自己的内存对partition进行存储,启动其他的进程或者线程对Rdd中的partition进行计算。
-- Executor:Executor是work节点的进程,在Executor上面可以启动多个Task线程。
-- Task:在Executor进程上面启动的线程。
Note:
Executor和Task其实就是负责执行,对RDD中的partition进行并行的计算,也就是执行我们对RDD定义的map、flatMap算子操作。
6、创建初始的RDD
1)通过程序中集合的方式创建RDD、主要用于测试使用。 通过parallelizer()方法
2)使用本地文件创建RDD;主要是临时性处理一些存储大量的文件 textFile(path)方法可以支配到目录加载整个目录的文件。
3)使用HDFS文件创建RDD;主要用于生产环境;进行离线批处理。 textFile(path)方法
7、RDD算子操作
转换和行动算子。
转换算子:它是懒执行的,当真正的触发行动算子后才进行之前的转换计算。
行动算子:触发真正spark job的操作。 reduce 、collect 、count、first、take、countByKey()、takeSample、foreach、save
等9种行动算子。
详细代码链接:https://blog.csdn.net/yangshaojun1992/article/details/88376322
详细代码链接:https://blog.csdn.net/yangshaojun1992/article/details/88376963
8、RDD持久化操作
不使用RDD持久化带来的问题:
持久化后的情况:
代码实例链接:https://blog.csdn.net/yangshaojun1992/article/details/88377557
9、RDD共享变量
共享变量的原理
广播变量:
详细代码链接:https://blog.csdn.net/yangshaojun1992/article/details/88378882
累加器:
详细代码链接:https://blog.csdn.net/yangshaojun1992/article/details/88378882