连着把这一块改总结的笔记都写了,时间太紧张了,对不住各位了!????????????

Spark RDD

弹性的分布式数据集,可以理解为一个Java类,里面放的都是数据。RDD代表一个不可变的对元素分区的集合。并且RDD可以被并行计算。

重温大数据---正儿八经Spark再探
Spark RDD特性

  • 分为若干个区
  • 每个分片用一个函数计算
  • RDD直接是一个依赖关系
  • 对于K-V的RDD可指定一个分区,告诉它如何分片
  • 要运行的计算/执行最好在哪几个机器上运行

RDD lineage 有生命线 ,它保存了如何转换得来的信息。

处理RDD split进行计算时,split数据在哪里,我们尽量在那台机器上进行计算,移动计算,不是移动数据。

RDD 操作

RDD的创建

  • Parallelized Collections

重温大数据---正儿八经Spark再探

  • External Datasets

重温大数据---正儿八经Spark再探

重温大数据---正儿八经Spark再探

Transformation
重温大数据---正儿八经Spark再探
Action
重温大数据---正儿八经Spark再探

RDD的依赖

重温大数据---正儿八经Spark再探

  • 窄依赖

重温大数据---正儿八经Spark再探

  • 宽依赖

重温大数据---正儿八经Spark再探

RDD Shuffle

重温大数据---正儿八经Spark再探

The shuffle is Spark’s mechanism for re-distributing data

那些操作会引起Shuffle?

  • 具有重新调整分区操作 ,eg:repartition,coalesce
  • *BeyKey eg: groupByKey ,reduceByKey
  • 关联操作 eg: join ,cogroup

Spark 内核

重温大数据---正儿八经Spark再探

重温大数据---正儿八经Spark再探

DAG调度

阶段的划分是按是否有shuffle来判断的。

重温大数据---正儿八经Spark再探

  • 接收用户提交的job
  • 构建 Stage,记录哪个 RDD 或者 Stage 输出被物化
  • 重新提交 shuffle 输出丢失的 stage
  • 将 Taskset 传给底层调度器

Task 调度

重温大数据---正儿八经Spark再探

  • 提交 taskset( 一组 task) 到集群运行并监控
  • 为每一个 TaskSet 构建一个 TaskSetManager 实例管理这个 TaskSet
    的生命周期
  • 数据本地性决定每个 Task 最佳位置 (process-local, node-local,
    rack-local and then any)
  • 推测执行,碰到 straggle 任务需要放到别的节点上重试出现 shuffle
    输出 lost 要报告 fetch failed 错误

Partition和Task

重温大数据---正儿八经Spark再探

  • Task是Executor中的执行单元
  • Task处理数据常见的两个来源:外部存储以及shuffle数据
  • Task可以运行在集群中的任意一个节点上
  • 为了容错,会将shuffle输出写到磁盘或者内存中

案例

  • 排序

  • Top key

    WordCount

    val rdd = sc.textFile("hdfs://master:8020/user/mapreduce/wordcount/input/wc.input")
    val wordcount = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
    wordcount.collect
    wordcount.saveAsTextFile("hdfs://master:8020/user/mapreduce/wordcount/sparkOutput99")
    
    ======result
    (hive,2)
    (mapreduce,1)
    (mapreduce2,1)
    (hadoop,5)
    (hdfs,1)
    发现:
    	Spark 运行WordCoun程序,并没有像MapReduce程序那样,对Key进行排序。
    ## Key Sort
    wordcount.sortByKey().collect    ## 默认情况是 升序
    wordcount.sortByKey(true).collect	
    wordcount.sortByKey(false).collect		
    	
    ===============================================================	
    ------result
    (hive,2)
    (mapreduce,1)
    (mapreduce2,1)
    (hadoop,5)
    (hdfs,1)	
    需求:
    	按照value值进行降序
    ## Value Sort
    wordcount.map(x => (x._2,x._1)).sortByKey(false).collect
    wordcount.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).collect
    	
    ## Top N
    wordcount.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).take(3)
    

总结

再水一文。我承认spark是学的真的菜!

相关文章:

  • 2021-09-24
  • 2021-08-20
  • 2021-12-12
  • 2022-01-22
  • 2021-12-21
  • 2021-11-21
  • 2021-06-14
  • 2022-12-23
猜你喜欢
  • 2021-09-14
  • 2021-12-26
  • 2022-12-23
  • 2021-11-23
  • 2021-10-25
  • 2021-11-16
  • 2021-07-03
相关资源
相似解决方案