1、什么是RDD? RDD的5大特性。

    RDD是spark中的一种抽象,他是弹性分布式数据集.

    a) RDD由一系列的partition组成

    b) 算子作用在partition上

    c) RDD之间具有依赖关系

    d) partition提供了最佳计算位置(体现了移动计算不移动数据思想)

    e) 分区器作用在K、V格式的RDD上。

    哪里体现了弹性、分布式?

    弹性:partition可大可小,可多可少。RDD之间具有依赖关系;能做到很好的容错。

    分布式:partition分布在集群的不同节点上。

2、怎么理解partition,如何合理的设置partition的数量。

      在HDFS存储的数据是以block块的形式进行物理存储。RDD是Spark加载HDFS数据到内存中的抽象,里面不存储数据。RDD是由partion组成的,partion被分布在多个节点上。partion是数据的逻辑存储,实际上不存储数据集。当我们对RDD进行操作时,实际上是对每个分区中的数据并行操作。

    Spark从HDFS读入文件的分区数默认等于HDFS文件的块数(blocks),HDFS中的block是分布式存储的最小单元。如果我们上传一个30GB的非压缩的文件到HDFS,HDFS默认的块容量大小128MB,因此该文件在HDFS上会被分为235块(30GB/128MB);Spark读取SparkContext.textFile()读取该文件,默认分区数等于块数即235。

   如何设置合理的分区数

1、分区数越多越好吗?

不是的,分区数太多意味着任务数太多,每次调度任务也是很耗时的,所以分区数太多会导致总体耗时增多。

2、分区数太少会有什么影响?

分区数太少的话,会导致一些结点没有分配到任务;另一方面,分区数少则每个分区要处理的数据量就会增大,从而对每个结点的内存要求就会提高;还有分区数不合理,会导致数据倾斜问题。

3、合理的分区数是多少?如何设置?

总核数=executor-cores * num-executor

一般合理的分区数设置为总核数的2~3倍

3、RDD或者partition里面存储数据吗?怎么理解内存计算。

  RDD或者partition里面不存储数据,partition只是数据的逻辑存储。实际存储数据的地方在block中。

  在RDD依赖关系中,在没有action算子触发之前,RDD之间的依赖只是一种逻辑关系,不会触发真正的计算

 当action算子触发后数据进行计算,计算的中间结果存储在内存中。

4、Spark中的hello world (word count)

    使用java、scala和spark-shell开发wordCount程序。 spark-shell 其实就是spark的命令行,方便测试,验证程序

   java开发


/**
 * 本地测试wordCount
 *
 * @author yangshaojun
 * #date  2019/3/9 10:00
 * @version 1.0
 */
public class Demo000_WordCountLocal {
    public static void main(String[] args) {
        //  编写spark应用程序
        //  第一步:创建sparkconf对象,设置spark应用配置信息
        //  使用setMaster()设置Spark应用程序要连接的Spark集群的Master节点的url
        //  如果设置local则代表在本地执行
        SparkConf conf = new SparkConf().setAppName("wordCount").setMaster("local[2]");
        //  第二步: 创建JavaSparkContext对象
        //  在Spark中SparkContext是spark所有功能的入口,无论使用java、scala还是python编写都必须有一个sparkContext对象.
        //  它的作用:初始化spark应用程序所需要的的一些核心组件,包括调度器(DAGSchedule、TaskSchedule)
        //  还回去spark Master节点上进行注册等等。
        //  但是呢,在spark中编写不同类型的spark应用程序使用的spakContext是不同的,
        //  如果使用scala那么就是sparkContext对象
        //  如果使用java那么就是JavaSparkContext对象
        //  如果开发spark Sql程序那么就是SQLContext或者HiveContext
        //  如果是开发Spark Streaming程序,那么就是它独有的SparkContext。
        JavaSparkContext sc = new JavaSparkContext(conf);
        //  第三步:从数据源(HDFS、本地文件等)加载数据 创建一个RDD
        //  parkContext中根据文件的类型输入源创建的RDD的方法 是textFile()
        //  在java中,创建的普通RDD都叫做JavaRDD
        //  在这里呢,RDD有元素的概念,如果是HDFS或者是本地文件创建的RDD,其每个元素相当于文件的一行数据。
        JavaRDD<String> linesRDD = sc.textFile("data/sparkcore/wordcount");
        //  第四步: 对初始的RDD进行transformation操作
        //  将每一行拆分成单个的单词
        JavaRDD<String> words = linesRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String line) throws Exception {
                String[] str = line.split(",");
                return Arrays.asList(line.split(","))
            }
        });
        JavaPairRDD<String, Integer> word = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
        JavaPairRDD<String, Integer> result = word.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer int1, Integer int2) throws Exception {
                return int1 + int2;
            }
        });

        result.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                System.out.println(stringIntegerTuple2);
            }
        });


    }
}

   提交spark程序到集群运行

  Note:

   1)如果要在spark集群上运行应用程序,需要将本地开发中的 setMaster()删除默认自己连接,同时将本地文件改为      hdfs文件。

   2)对Spark应用程序打成jar包,上传到服务器上

   3)编写spark-submit脚本、执行spark-submit脚本、提交应用程序到集群执行。

 ./spark-submit 
--class com.netcloud.bigdata.spark_core.basiclearning.demo.Demo000_WordCount 
--num-executor 3 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 3 \
/opt/wordcount.jar

Note:在这里我们提交程序的时候没有加 --master spark://192.168.2.100:7077 表示任务的执行是在local模式下执行的。

      否则就是在standalone模式下执行。

scala开发

object Demo000_WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("word_count").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val linesRDD = sc.textFile("data/sparkcore/wordcount.txt")
    val wordsRDD = linesRDD.flatMap(line => line.split(","))
    val wordTuple = wordsRDD.map(word => (word, 1))
    val retRdd = wordTuple.reduceByKey((a: Int, b: Int) => (a + b))
    retRdd.foreach(println)
  }

}

 

  spark-shell开发(主要用于简单的测试)

spark总结

深度剖析wordCount

spark总结

Note:reduceBykey算子产生shuffle,什么是shuffle?shuffle就是数据重新进行分区、落地到磁盘的过程。

5、Spark架构原理

--  Application:用户编写的Spark应用程序。

--  Driver : Driver就是一个进程,我们编写的spark应用程序就在Driver上面;由Driver进程执行。根据任务提交的方式不同Driver所处的节点位置不同;Driver可能处在我们提交spark程序的机器上面,也有可能处在spark集群中的某个节点之上。

--  Master: 它是Spark集群的主进程;负责负责资源的调度和分配、和集群的监控。

-- Worker: 它是Spark集群的进程;通过自己的内存对partition进行存储,启动其他的进程或者线程对Rdd中的partition进行计算。

-- Executor:Executor是work节点的进程,在Executor上面可以启动多个Task线程。

-- Task:在Executor进程上面启动的线程。

Note:

Executor和Task其实就是负责执行,对RDD中的partition进行并行的计算,也就是执行我们对RDD定义的map、flatMap算子操作。

spark总结

6、创建初始的RDD

  1)通过程序中集合的方式创建RDD、主要用于测试使用。 通过parallelizer()方法

  2)使用本地文件创建RDD;主要是临时性处理一些存储大量的文件 textFile(path)方法可以支配到目录加载整个目录的文件。

  3)使用HDFS文件创建RDD;主要用于生产环境;进行离线批处理。 textFile(path)方法

7、RDD算子操作

  转换和行动算子。

 转换算子:它是懒执行的,当真正的触发行动算子后才进行之前的转换计算。

 行动算子:触发真正spark job的操作。 reduce 、collect 、count、first、take、countByKey()、takeSample、foreach、save

 等9种行动算子。

spark总结

spark总结

详细代码链接:https://blog.csdn.net/yangshaojun1992/article/details/88376322

spark总结

详细代码链接:https://blog.csdn.net/yangshaojun1992/article/details/88376963

8、RDD持久化操作

不使用RDD持久化带来的问题:

spark总结

持久化后的情况:

spark总结

spark总结

spark总结

代码实例链接:https://blog.csdn.net/yangshaojun1992/article/details/88377557

9、RDD共享变量

spark总结

共享变量的原理

spark总结

广播变量:

spark总结

详细代码链接:https://blog.csdn.net/yangshaojun1992/article/details/88378882

 

累加器:

spark总结

详细代码链接:https://blog.csdn.net/yangshaojun1992/article/details/88378882

 

相关文章:

  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-12-25
  • 2021-12-20
  • 2021-06-08
  • 2021-07-27
  • 2022-01-02
猜你喜欢
  • 2021-08-17
  • 2021-06-23
  • 2021-09-08
  • 2021-12-03
  • 2021-06-29
  • 2021-06-07
相关资源
相似解决方案