第7章Spark
7.1 sparksql 和 sparkstreaming哪个比较熟
答:都还行,SparkSql 的 DataFrame 或者 DataSet 和 SparkStreaming 的 DStream 都是基于SparkCore 的,最终都会转化为 Sparktask 执行。我们可以交流一下本质的东西 SparkCore,而SparkCore 的核心又是 RDD。
7.2 说一下 sparkshuffle
答:Spark 的 shuffle 也是一处理问题的思想:分而治之。shuffle 一般称为洗牌,一般会有Shuffle。Write 阶段 和 ShuffleRead 阶段。
在 Spark 中实现 Shuffle 的方式有两种,一种是 HashShuffle,一种是 SortShuffle。shuffle 的性能是影响 spark 应用程序性能的关键。shuffle 发生在 stage 之间,stage 中用的 pipline 的计算模式。
7.3 Spark Shuffle 的调优点:

1:Shuffle 的选择 2:缓冲区的大小 3:拉去的数据量的大小 4:间隔时间重试次数。

7.4 缓存这块熟悉吗,介绍缓存级别

答:Spark 的缓存机制是 Spark 优化的一个重要点,它将需要重复使用或者共用的 RDD 缓存在内存中,可以提高 Spark 的性能。
Spark 的底层源码中使用 StorageLevel 来表示缓存机制,其中包括:使用内存,使用磁盘,使用序列化,使用堆外内存。在他的半生对象中基于这几种方式提供了一些实现:不使用缓存,Memory_Only,Disk_only,offHeap 分别都有相应的序列化,副本,组合的实现提供选择。持久化的级别 StorageLevel 可以自定义,但是一般不自定义。如何选择 RDD 的缓存级别的本质是在内存的利用率和 CPU 的利用率之间的权衡。一般默认选择的是 Memory_only, 其次是 Memery_only_Ser, 再次是 Memory_only_and_Dis 至于怎么选择你得自己权衡。

7.5 说一下 cache 和 checkpoint 的区别

答:要知道区别,首先要知道实现的原理和使用的场景 catche 的就是将共用的或者重复使用的 RDD 按照持久化的级别进行缓存 checkpoint 的是将业务场景非常长的逻辑计算的中间结果缓存到 HDFS 上,它的实现原理是:
首先找到 stage 最后的 finalRDD,然后按照 RDD 的依赖关系进行回溯,找到使用了 checkPoint 的 RDD 然后标记这个使用了 checkPoint 的 RDD 重新的启动一个线程来将 checkPoint 之前的 RDD 缓存到 HDFS 上面最后将 RDD 的依赖关系从 checkPoint 的位置切断知道了实现的原理和使用场景后我们就很容易的知道了 catch 和 checkpoint 的区别了。

7.6 spark 运行模式 local local[] local[*] 分别是什么

答:该模式被称为 Local[N] 模式,是用单机的多个线程来模拟 Spark 分布式计算,通常用来验证开发出来的应用程序逻辑上有没有问题其中N代表可以使用 N 个线程,每个线程拥有一个 core 。如果不指定 N,则默认是1个线程(该线程有1个 core )。如果是 local[*],则代表 Run Spark locally with as many worker threads as logical cores on your machine: 在本地运行Spark,与您的机器上的逻辑内核一样多的工作线程。

7.7 Spark 怎么设置垃圾回收机制 ?
Spark 中各个角色的JVM参数设置:
http://blog.csdn.net/wuxb2000/article/details/52870198 1)Driver 的 JVM 参数: GC 方式, 如果是 yarn-client 模式,默认读取的是 spark-class 文件中的 JAVAOPTS;如果是 yarn-cluster 模式,则读取的是 spark-default.conf 文件中的 spark.driver.extraJavaOptions 对应的参数值。 (2)Executor 的 JVM 参数: GC 方式,两种模式都是读取的是 spark-default.conf 文件中的spark.executor.extraJavaOptions 对应的 JVM 参数值。

7.8 一台节点上以 root 用户执行一个 spark 程序,以其他非 root 用户也同时在执行一个 spark 程序,这时以 spark 用户登录,这个节点上,使用 Jps 会看到哪些线程?

答:单独的用户只能看自己的进程
7.9 spark 和 mapreduce 的对比

答:MR 一般处理大量数据的时候一般会存在高延迟,浪费时间,对于一些有时间要求的业务就很不适合。但是如果用 spark 处理的话就非常快了,特别是对于实时动态处理的过程。

下面我会针对人事简历方面的问题总结一下我的想法

7.10 Spark 的提交方式?
答案:不管是提交到 yarn 上面还是提交到 standalone 上都分为 Client 的方式提交和 Cluster 的方式提交。
7.11 Spark 都有什么算子?
答案:map,reducebykey,filter,mapPartition,flatmap,cogroup,foreach,first,take, join, sortBy,distinct,等等
7.12 spark on yarn 和 mapreduce 中 yarn 有什么区别?

答:没有什么区别,yarn 就是一种任务调度框架

7.13 spark 运行的 job 在哪里可以看到?

答: 一般是在 WEBUI 上 查看,如果问具体怎么配置的可以推到运维人员身上

7.14 用scala 写一个 wordcount ?
object ScalaWordCount {
def main(args: Array[String]): Unit = {
val lines = List(“hello java hello python”,“hello scala”,“hello scala hello java hello scala”)
// 切块
val words = lines.flatMap(.split(" "))
// 生成pair
val tuples = words.map((
,1))
// k,v分组
val grouped = tuples.groupBy(.1)
val sumed = grouped.mapValues(
.size)
// 排序
val sorted = sumed.toList.sortBy(
._2)
// 降序
val result = sorted.reverse
println(result)
}
}
7.15 对 scala 的了解 有用过 play framework 框架吗

答:是一个纯java框架,可以进行快速开发,开发周期较短,并且能够快速建立一个java web所需要的所有内容。

7.16 scala 中的隐式函数的关键字?

答:implicit

7.17 val x=y=1 结果是什么?

答:会报错

7.18 编译好的 scala 程序,运行的时候还需要 scala 环境吗?

答:不需要

下面是我在网络上找的思维导图,介绍的比较详细,大家可以认真的看一下
7.19 spark 提交任务的流程

在这里我找了一个网图,相信看图来的更加直接一些。
大数据(spark)面试
7.20 rdd 怎么转 dataFrame
答:可以通过反射的方式来推断元数据,因为 RDD 本身是没有元数据的,通过反射就可以了解这些元数据并且进一步转换成 dtaframe
7.21 问我 spark、jdk、scala 的版本

JDK:1.8 SPARK:2.2.0 SCALA:2.11.8 HADOOP:2.6.5
7.22 spark 和 storm 的区别?

答:storm是对大量的小型数据块进行处理,并且是动态数据 spark一般是对大量数据进行进行全集处理,并且侧重传输数据的过程
7.23 persist 和 checkpoint 的区别

答:persits一般是将数据持久化到磁盘上但是一旦进程被停掉的话,在磁盘上的数据也会同时被清空 而checkpoint 是将 RDD 持久化到 HDFS 上的,如果不手动删除的话是一直存在的
7.24 怎么理解scala的函数式编程
所谓函数式编程,其实就是以纯函数的方式编写代码。
函数式编程,顾名思义,这种编程是以函数思维做为核心,在这种思维的角度去思考问题。这种编程最重要的基础是λ演算(匿名函数),接受函数当作输入和输出。

对于函数式编程,有几个非常重要的和基础性的特征:

1、函数是一等公民:
所谓一等公民是指函数也有数据类型,函数与其他数据类型的变量或值一样,处于平等地位,可以赋值给其它变量,也可以作为函数参数,传入另一个函数,或者作为别的函数的返回值。函数是一等公民是函数式编程范式最重要的特性和基础。
2、不可变数据
所有的状态(或变量)都是不可变的。你可以声明一个状态,但是不能改变这个状态。如果要变化,只能复制一个。纯函数式编程语言不使用任何可变数据结构或变量。但在Scala等编程语言中,即支持不可变的数据结构或变量,也支持可变的。
3、强调函数没有"副作用"
所谓函数没有副作用,意味着函数要保持独立,一旦函数的输入确定,输出就是确定的,函数的执行不会影响系统的状态,不会修改外部状态。想象下,如果函数没有副作用,那函数的执行就可以缓存起来了,一旦函数执行过一次,如果再次执行,当输入和前面一样的情况下,就直接可以用前面执行的输出结果,根本就不用再次运算了,想象下,这个是否可大大提高程序运行的效率。
4、一切皆是表达式
在函数式编程语言中,每一个语句都是一个表达式,都会有返回值。比如scala中的if-else控制结构就是一个有返回值的表达式。这与命令式编程语言中的if-else语句显著不同。这一特性有助于用不可变量编写应用程序。
面向对象编程,这种编程是把问题看作由对象的属性与对象所进行的行为组成。基于对象的概念,以类作为对象的模板,把类和继承作为构造机制,以对象为中心,来思考并解决问题。
7.25 RDD、DataFrame、DataSet的区别
共性:RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利

1)schema信息:
(1)RDD中的数据是没有schema元数据信息
(2)DataFrame和DataSet 都有schema元数据信息。但DataFrame中的数据是弱数据类型,不会做数据类型检查。虽然有schema规定了数据类型,但是编译时是不会报错的,运行时才会报错。DataSet中的数据类型是强数据类型
2)序列化机制:
(1)RDD和DataFrame默认的序列化机制是java的序列化,可以修改为Kyro的机制
(2)DataSet使用自定义的数据编码器进行序列化和反序列化
7.26 spark中driver和worker的通信框架
spark 的 master 和 worker 通过什么方式进行通信的? (D )

A http B nio C netty D Akka
解析:Spark 1.6.0之前使用的是Akka通信框架
Spark 1.6.0之后使用的是netty通信框架
7.27 akka和netty通信框架的区别
一直以来,基于Akka实现的RPC通信框架是Spark引以为豪的主要特性,也是与Hadoop等分布式计算框架对比过程中一大亮点,但是时代和技术都在演化,从Spark1.3.1版本开始,为了解决大数据块(如shuffle)的传输问题,Spark引入了Netty通信框架,到了1.6.0版本,Netty居然完全取代了Akka,承担Spark内部所有的RPC通信以及数据流传输。
那么Netty为什么可以取代Akka?首先不容置疑的是Akka可以做到的,Netty也可以做到,但是Netty可以做到,Akka却无法做到,原因是啥?在软件栈中,Akka相比Netty要Higher一点,它专门针对RPC做了很多事情,而Netty相比更加基础一点,可以为不同的应用层通信协议(RPC,FTP,HTTP等)提供支持。
7.28 spark算子类型,它的工作机制
Transformation(转化)算子和Action(执行)算子。
在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换。经过一系列的transformations定义RDD之后,就可以调用actions触发RDD的计算,action可以是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。在Spark中,只有遇到action,才会执行RDD的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。
7.29 reduceByKey和groupByKey的区别
通过源码可以发现:
reduceByKey:reduceByKey会在结果发送至reducer之前会对每个mapper在本地进行merge,有点类似于在MapReduce中的combiner。这样做的好处在于,在map端进行一次reduce之后,数据量会大幅度减小,从而减小传输,保证reduce端能够更快的进行结果计算。

groupByKey:groupByKey会对每一个RDD中的value值进行聚合形成一个序列(Iterator),此操作发生在reduce端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。同时如果数据量十分大,可能还会造成OutOfMemoryError。

通过以上对比可以发现在进行大量数据的reduce操作时候建议使用reduceByKey。不仅可以提高速度,还是可以防止使用groupByKey造成的内存溢出问题。
7.30 Spark工作机制
(1)用户在client端提交作业后,会由Driver运行main方法并创建spark context上下文。
(2)执行add算子,形成dag图输入dagscheduler
(3)按照add之间的依赖关系划分stage输入task scheduler
(4)task scheduler会将stage划分为taskset分发到各个节点的executor中执行
7.31 spark工作的一个流程?
(或问Spark应用程序的执行过程是什么?)
1)用户在client端提交作业后,会由Driver运行main方法,构建Spark Application的运行环境(启动SparkContext),SparkContext向资源管理器(可以是Standalone、Mesos或YARN)注册并申请运行Executor资源;
2)资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行情况将随着心跳发送到资源管理器上;
3)SparkContext构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler。Executor向SparkContext申请Task,Task Scheduler将Task发放给Executor运行同时SparkContext将应用程序代码发放给Executor;
4)Task在Executor上运行,运行完毕释放所有资源。
7.32 Spark有哪些组件
1)master:管理集群和节点,不参与计算。 
2)worker:计算节点,进程本身不参与计算,和master汇报。 
3)Driver:运行程序的main方法,创建spark context对象。 
4)spark context:控制整个application的生命周期,包括dagsheduler和task scheduler等组件。 
5)client:用户提交程序的入口。

7.33 Spark中Driver和Worker的功能是什么?
1、Driver
1)一个Spark作业运行时包括一个Driver进程,也是作业的主进程,具有main函数,并且有SparkContext的实例,是程序的入口点;
2)功能:负责向集群申请资源,向master注册信息,负责了作业的调度,负责作业的解析、生成Stage并调度Task到Executor上。包括DAGScheduler,TaskScheduler。
2、Worker
主要功能:管理当前节点内存,CPU的使用状况,接收master分配过来的资源指令,通过ExecutorRunner启动程序分配任务,worker就类似于包工头,管理分配新进程,做计算的服务,相当于process服务。
需要注意的是:
1)worker会不会汇报当前信息给master,worker心跳给master主要只有workid,它不会发送资源信息以心跳的方式给mater,master分配的时候就知道work,只有出现故障的时候才会发送资源。
2)worker不会运行代码,具体运行的是Executor是可以运行具体appliaction写的业务逻辑代码,操作代码的节点,它不会运行程序的代码的。
7.34 sparkStreaming UpdateStateByKey底层是如何实现保存数据原来的状态的
可以知道每次调用updateStateByKey都会将旧的状态RDD和当前batch的RDD进行co-group来得到一个新的状态RDD,即使真正需要跟新的数据只有1条也需要将两个RDD进行cogroup,所有的数据都会被计算一遍,而且随着状态的不断增加,运行速度会越来越慢。
7.35 sparkstreaming消费Kafka的偏移量怎么维护
(1) spark streaming里面管理偏移量的策略,默认的spark streaming它自带管理的offset的方式是通过checkpoint来记录每个批次的状态持久化到HDFS中,如果机器发生故障,或者程序故障停止,下次启动时候,仍然可以从checkpoint的目录中读取故障时候rdd的状态,便能接着上次处理的数据继续处理,但checkpoint方式最大的弊端是:如果代码升级,新版本的jar不能复用旧版本的序列化状态,导致两个版本不能平滑过渡,结果就是要么丢数据,要么数据重复,所以官网搞的这个东西,几乎没有人敢在生产环境运行非常重要的流式项目。
(2) 所以比较通用的解决办法就是自己写代码管理spark streaming集成kafka时的offset,自己写代码管理offset,其实就是把每批次offset存储到一个外部的存储系统里面包括(Hbase,HDFS,Zookeeper,Kafka,DB等等),不用的什么存储系统, 都需要考虑到三种时刻的offset的状态,否则offset的状态不完整,就可能导致一些bug出现。
7.36 sparkstreaming的窗口函数
Spark还提供了窗口的计算,它允许你使用一个滑动窗口应用在数据变换中。下图说明了该滑动窗口。

如图所示,每个时间窗口在一个个DStream中划过,每个DSteam中的RDD进入Window中进行合并,操作时生成为
窗口化DSteam的RDD。在上图中,该操作被应用在过去的3个时间单位的数据,和划过了2个时间单位。这说明任
何窗口操作都需要指定2个参数。
window length(窗口长度):窗口的持续时间(上图为3个时间单位)
sliding interval (滑动间隔)- 窗口操作的时间间隔(上图为2个时间单位)。
上面的2个参数的大小,必须是接受产生一个DStream时间的倍数
让我们用一个例子来说明窗口操作。比如说,你想用以前的WordCount的例子,来计算最近30s的数据的中的单词数,10S接受为一个DStream。为此,我们要用reduceByKey操作来计算最近30s数据中每一个DSteam中关于(word,1)的pair操作。它可以用reduceByKeyAndWindow操作来实现。一些常见的窗口操作如下。所有这些操作都需要两个参数— window length(窗口长度)和sliding interval(滑动间隔)。
7.37 SparkStreaming 哪几种方式读取kafka数据 (receiver 和 direct)
1.Spark Streaming接收数据的方式有两种:
1)基于Receiver的方式: offset存储在zookeeper,由Receiver维护,Spark获取数据存入executor中,调用Kafka高阶API。

2)基于Direct的方式:offset自己存储和维护,由Spark维护,且可以从每个分区读取数据,调用Kafka低阶API。

2.Direct方式详解
(1)Direct的方式是会直接操作Kafka底层的元数据信息。
(2)由于直接操作的是Kafka,Kafka就相当于底层的文件系统。
(3)由于底层是直接读取数据,没有所谓的Receiver,直接是周期性(Batch Intervel)的查询Kafka,处理数据的时候,我们会使用基于Kafka原生的Consumer API 来获取Kafka中特定范围(offset范围)中的数据。
(4)读取多个Kafka partition,Spark也会创建RDD的partition,这个时候RDD的partition和kafka的parition是一致的。

(5)不需要开启WAL机制,从数据零丢失的角度来看,极大的提升了效率,还至少能节省一倍的磁盘空间。从Kafka获取数据,比从HDFS获取数据,因为zero copy的方式,速度肯定更快。
3.Direct与Receiver的对比
1)从容错角度
(1)Receiver(高层次的消费者API):在失败的情况下,有些数据很有可能会被处理不止一次。接收到的数据被可靠的保存在WAL中,但是还没来得及更新Zookeeper中的kafka偏移量。导致数据不一致行性:Streaming知道数据被接收,但Kafka认为数据还没被接收。这样系统恢复正常时,Kafka会再一次发送这些数据。At least once
(2)Direct(低层次消费者API):给出每个batch区间需要读取的偏移量位置,每个batch的Job被运行时,对应偏移量的数据从Kafka拉取,偏移量信息也可被可靠地存储(checkpoint),在从失败中恢复可以直接读取这些偏移量信息。Exactly once

Direct API消除了需要使用WAL的Receivers的情况,而且确保每个Kafka记录仅被接收一次并被高效地接收。这就使得我们可以将Spark Streaming和Kafka很好地整合在一起。总体来说,这些特性使得流处理管道拥有高容错性,高效性,而且很容易地被使用。

2)API的使用区别

  1. Direct中task和partition是一一对应的,会多一个线程去拉取数据。

Spark一般指定excutor数据,而不指定机子数
7.38 SparkStreaming 读取kafka时如何保证实时数据不丢失的问题 至多 至少(数据多一点) 精确一次
1、保证数据不丢失(at-least)
spark RDD内部机制可以保证数据at-least语义。
(1)Receiver方式
开启WAL(预写日志),将从kafka中接受到的数据写入到日志文件中,所有数据从失败中可恢复。
(2)Direct方式
依靠checkpoint机制来保证。
保证数据不重复(exactly-once)
2、要保证数据不重复,即Exactly once语义。

  • 幂等操作:重复执行不会产生问题,不需要做额外的工作即可保证数据不重复。
  • 业务代码添加事务操作
    dstream.foreachRDD {(rdd, time) =
    rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds,partitionId)
    //use this uniqueId to transationally commit the data in partitionIterator
    }
    }
    对每个partitionID,产生一个uniqueID,.只有这个partition的数据被完全消费,才算成功,否则失败回滚。下次若重复执行,就skip
    7.39 spark那些算子是在map端聚合的
    reduceByKey
    7.40 你们spark哪个版本
    spark-2.2.0
    7.41 手写spark wc
    这个题目即考察了你对shell的掌握,又考察了你对scala的了解,还考察了你动手写代码的能力,是比较好的一道题(实际开发中,有些代码是必须要背下来的,烂熟于心,劣等的程序员就是百度+copy,是不可取的)
    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    val line = sc.textFile(“xxxx.txt”) line.flatMap(.split(" ")).map((,1)).reduceByKey(+). collect().foreach(println) sc.stop()
    7.42 对于Spark中的数据倾斜怎么处理
    1)前提是定位数据倾斜,是OOM了,还是任务执行缓慢,看日志,看WebUI
    2)解决方法,有多个方面
    · 避免不必要的shuffle,如使用广播小表的方式,将reduce-side-join提升为map-side-join
    ·分拆发生数据倾斜的记录,分成几个部分进行,然后合并join后的结果
    ·改变并行度,可能并行度太少了,导致个别task数据压力大
    ·两阶段聚合,先局部聚合,再全局聚合
    ·自定义paritioner,分散key的分布,使其更加均匀
    7.43 mr和spark的shuffle的区别
  1. high-level 角度:
    (1)两者并没有大的差别 都是将 mapper(Spark: ShuffleMapTask)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)
    Reducer 以内存作缓冲区,边 shuffle 边 aggregate 数据,等到数据 aggregate 好以后进行 reduce()。
    2)low-level 角度:
    (1)Hadoop MapReduce 是 sort-based,进入 combine() 和 reduce() 的 records 必须先 sort。
    (2)好处:combine/reduce() 可以处理大规模的数据
    a.因为其输入数据可以通过外排得到
    b.mapper 对每段数据先做排序
    c.reducer 的 shuffle 对排好序的每段数据做归并
    (3)Spark 默认选择的是 hash-based,通常使用 HashMap 来对 shuffle 来的数据进行 aggregate,不提前排序
    (4)如果用户需要经过排序的数据:sortByKey()
    3)实现角度:
    (1)Hadoop MapReduce 将处理流程划分出明显的几个阶段:map(), spilt, merge, shuffle, sort, reduce()
    (2)Spark 没有这样功能明确的阶段,只有不同的 stage 和一系列的 transformation(),spill, merge, aggregate 等操作需要蕴含在 transformation() 中
    7.44说一下hadoop和spark的shuffle过程
    (1)hadoop:map端保存分片数据,通过网络收集到reduce端
    (2)spark:spark的shuffle是在DAGSchedular划分Stage的时候产生的,TaskSchedule要分发Stage到各个worker的executor,减少shuffle可以提高性能
    7.45 Scala伴生对象
    在Scala中,如果在同一个源码文件中,同时存在使用object声明的类对象(Person)以及使用class声明的类(Person),那么这个类对象就是该类的伴生对象,而这个类就是这个类对象的伴生类。
    注:类和它的伴生对象可以相互访问私有属性或方法,他们必须存在同一个源文件中。必须同名。
    从技术的角度来讲,伴生对象和伴生类其实是两个不同的类,伴生对象所对应的类可以简答的理解为伴生类的辅助工具类。而伴生对象就是这个辅助工具类的单例对象,专门用于处理伴生类中静态资源的相关功能.
    伴生对象既然是一个单例对象,就意味着类已经声明过了,所以,伴生(类)对象是不能声明构造器的
    7.46 解释Scala的模式匹配
    scala模式匹配和Java的switch功能类似,但是功能更加的强大,支持各种类型的匹配方式。
    1、数值和字符串匹配
    val list = List(“aa”, 2, 3)
    for (x <- list) {
    x match {
    case 1 => println(“1”)
    case 2 => println(“2”)
    case 3 => println(“3”)
    case _ => println(“other”)
    }
    }
    /* output:
  • other
  • 2
  • 3
    /
    2、类型匹配
    val list = List(“aa” ,2, new TestClass, 1.0)
    for (x <- list) {
    x match {
    case str : String => println(“String”)
    case int : Int => println(“Int”)
    case a : TestClass => println(“TestClass”)
    case _ : Any => println(“other type”)
    }
    }
    /

    output:
    String
    Int
    TestClass
    other type
    /
    3、数组、集合匹配
    /
    * 数组 */
    val arr = Array(1, 2, 3)
    arr match {
    case Array(1, 2) => println(“case1 : 1, 2”)
    case Array(2, 3, 1) => println(“case2 : 2, 3, 1”)
    case Array(a, b, c) => println(s"case3 : $a, $b, $c")
    case _ => println(“case4 : other”)
    }
    // output: case3 : 1, 2, 3

/** List */
val list = List(1 ,2, 3)
list match {
case List(1, 2) => println(“case1 1, 2”)
case 1 :: 3 :: 2 :: Nil => println(“case2 1, 3, 2”)
case 1 :: 2 :: 3 :: Nil => println(“case3 1, 2, 3”)
case _ => println(“other”)
}
// output: case3 1, 2, 3
4、case类匹配
class NormalClass(val id : Int)
object NormalClass{
def apply(id: Int): NormalClass = new NormalClass(id)

def unapply(arg: NormalClass): Option[Int] = {
if (arg == null)
None
else
Some(arg.id)
}
}
// case类自动帮你生成伴生对象,apply和unapply等方法。效果和上面的一坨代码一样
case class CaseClass(id : Int)

val list = List(NormalClass(1), CaseClass(2))
for (x <- list) x match {
case NormalClass(1) => println(“case1 : NormalClass(1)”)
case NormalClass(2) => println(“case2 : NormalClass(2)”)
case CaseClass(1) => println(“case3 : caseClass(1)”)
case CaseClass(2) => println(“case4 : caseClass(2)”)
case _ => println(“other”)
}
/*

  • output:
  • case1 : NormalClass(1)
  • case4 : caseClass(2)
    */
    7.47 谈谈Scala隐士转换
    隐式转换函数是以implicit关键字声明的带有单个参数的函数。这种函数将会自动应用,将值从一种类型转换为另一种类型。
    7.48 scala中的柯里化
    定义:柯里化指的是将原来接受多个参数的函数变成新的接受一个参数的函数的过程。新的函数返回一个以原有的第二个参数作为参数的函数
       例如:
          def mul(x:Int,y:Int) = x * y //该函数接受两个参数
          def mulOneAtTime(x:Int) = (y:Int) => x * y //该函数接受一个参数生成另外一个接受单个参数的函数
          这样的话,如果需要计算两个数的乘积的话只需要调用:
          mulOneAtTime(5)(4)
        这就是函数的柯里化
    7.49 scala中的闭包
    闭包就是一个函数把外部的那些不属于自己的对象也包含(闭合)进来。

def minusxy(x: Int) = (y: Int) => x – y
还原:
def minusxy(x: Int) ={
def test(y: Int) ={
x – y //用到了外部变量 x
}
test _
}
7.50 spark几种部署模式还有他们的区别
1)本地模式
Spark不一定非要跑在hadoop集群,可以在本地,起多个线程的方式来指定。将Spark应用以多线程的方式直接运行在本地,一般都是为了方便调试,本地模式分三类
·local:只启动一个executor
·local[k]:启动k个executor
·local[*]:启动跟cpu数目相同的 executor
2)standalone模式
分布式部署集群,自带完整的服务,资源管理和任务监控是Spark自己监控,这个模式也是其他模式的基础。
3)Spark on yarn模式
分布式部署集群,资源和任务监控交给yarn管理,但是目前仅支持粗粒度资源分配方式,包含cluster和client运行模式,cluster适合生产,driver运行在集群子节点,具有容错功能,client适合调试,dirver运行在客户端
4)Spark On Mesos模式。
官方推荐这种模式(当然,原因之一是血缘关系)。正是由于Spark开发之初就考虑到支持Mesos,因此,目前而言,Spark运行在Mesos上会比运行在YARN上更加灵活,更加自然。用户可选择两种调度模式之一运行自己的应用程序:
(1)粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个Dirver和若干个Executor组成,其中,每个Executor占用若干资源,内部可运行多个Task(对应多少个“slot”)。应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。
(2)细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配。
7.51 Spark有哪两种算子,为什么遇到action操作的算子才会执行
Transformation(转化)算子和Action(执行)算子。
7.52. Map 和 FlatMap区别 对结果集的影响有什么不同
map:对RDD每个元素转换,文件中的每一行数据返回一个数组对象
flatMap:对RDD每个元素转换,然后再扁平化将所有的对象合并为一个对象,文件中的所有行数据仅返回一个数组对象,会抛弃值为null的值
7.53. 对RDD的理解
答:rdd分布式弹性数据集,简单的理解成一种数据结构,是spark框架上的通用货币。
所有算子都是基于rdd来执行的,不同的场景会有不同的rdd实现类,但是都可以进行互相转换。rdd执行过程中会形成dag图,然后形成lineage保证容错性等。 从物理的角度来看rdd存储的是block和node之间的映射。
7.54. Transformation和action的区别
1,transformation是得到一个新的RDD,方式很多,比如从数据源生成一个新的RDD,从RDD生成一个新的RDD
2,action是得到一个值,或者一个结果(直接将RDDcache到内存中)所有的transformation都是采用的懒策略,就是如果只是将transformation提交是不会执行计算的,计算只有在action被提交的时候才被触发。
7.55. 常用的RDD?
HadoopRDD MapPatitionRDD
1) HadoopRDD : Spark经常需要从hdfs读取文件生成RDD,然后进行计算分析。这种从hdfs读取文件生成的RDD就是HadoopRDD。
2) MapPatitionRDD : 读取文件README.md来创建RDD,然后进行计算分析。这种从hdfs读取文件生成的RDD就是MapPatitionRDD。

scala> val b = sc.textFile(“README.md”)
b: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[5] at textFile at :2
7.56.对DAG怎么理解的?怎么形成的?划分不同的stage的依据是什么?
DAG(Directed Acyclic Graph)叫做有向无环图。基于RDDs之间的依赖,RDDs会形成一个有向无环图DAG,该DAG描述了整个流式计算的流程,实际执行的时候,RDD是通过血缘关系(Lineage)一气呵成的,即使出现数据分区丢失,也可以通过血缘关系重建分区。
原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。

7.57 Spark优化
spark调优比较复杂,但是大体可以分为三个方面来进行
1)平台层面的调优:防止不必要的jar包分发,提高数据的本地性,选择高效的存储格式如parquet,
2)应用程序层面的调优:过滤操作符的优化降低过多小任务,降低单条记录的资源开销,处理数据倾斜,复用RDD进行缓存,作业并行化执行等等,
3)JVM层面的调优:设置合适的资源量,设置合理的JVM,启用高效的序列化方法如kyro,增大off head内存等等
7.58. BroadCast广播变量
Spark一个非常重要的特性就是共享变量

默认情况下,如果在一个算子的函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中。此时每个task只能操作自己的那份变量副本。如果多个task想要共享某个变量,那么这种方式是做不到的。
Spark为此提供了两种共享变量
Broadcast Variable(广播变量): 会将使用到的变量,仅仅为每个节点拷贝一份,更大的用处是优化性能,减少网络传输以及内存消耗。

  1. Spark提供的Broadcast Variable,是只读的。并且在每个节点上只会有一份副本,而不会为每个task都拷贝一份副本。
  2. 可以通过调用SparkContext的broadcast()方法,来针对某个变量创建广播变量。然后在算子的函数内,使用到广播变量时,每个节点只会拷贝一份副本了。每个节点可以使用广播变量的value()方法获取值

Accumulator(累加变量) :
则可以让多个task共同操作一份变量,主要可以进行累加操作。

  1. Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能。
  2. task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。
    7.59 什么是RDD宽依赖和窄依赖?
    RDD和它依赖的parent RDD(s)的关系有两种不同的类型
    (1)窄依赖:每一个parent RDD的Partition最多被子RDD的一个Partition使用 (一父一子)
    (2)宽依赖:多个子RDD的Partition会依赖同一个parent RDD的Partition (一父多子)
    7.60 Mapreduce操作的mapper和reducer阶段相当于spark中的哪几个算子?

相当于spark中的map算子和reduceByKey算子。
区别:MR会自动进行排序的,spark要看具体partitioner
7.61 Spark 为什么比mapreduce快
1)基于内存计算,减少低效的磁盘交互;
2)高效的调度算法,基于DAG;
3)容错机制Linage,精华部分就是DAG和Lingae,及时spark不使用内存技术,也大大快于mapreduce。
7.62 Spark中Lineage的基本原理
这里应该是问你Spark的容错机制的原理:
1)Lineage(又称为RDD运算图或RDD依赖关系图)是RDD所有父RDD的graph(图)。它是在RDD上执行transformations函数并创建logical execution plan(逻辑执行计划)的结果,是RDD的逻辑执行计划,记录了RDD之间的依赖关系。
2)使用Lineage实现spark的容错,本质上类似于数据库中重做日志,是容错机制的一种方式,不过这个重做日志粒度非常大,是对全局数据做同样的重做进行数据恢复。
7.63 Spark sql又为什么比hive快呢?
计算引擎不一样,一个是spark计算模型,一个是mapreudce计算模型。
7.64 cache和pesist的区别 
1)cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间;
2) cache只有一个默认的缓存级别MEMORY_ONLY ,cache调用了persist,而persist可以根据情况设置其它的缓存级别;
3)executor执行的时候,默认60%做cache,40%做task操作,persist最根本的函数,最底层的函数
7.65 Spark中map和mapPartition的区别?
rdd的mapPartitions是map的一个变种,它们都可进行分区的并行处理。
两者的主要区别是调用的粒度不一样:map的输入变换函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区。
假设一个rdd有10个元素,分成3个分区。如果使用map方法,map中的输入函数会被调用10次;而使用mapPartitions方法的话,其输入函数会只会被调用3次,每个分区调用1次。
这两个方法的另一个区别是在大数据集情况下的资源初始化开销和批处理处理,如果在map和mapPartition中都要初始化一个耗时的资源,然后使用,比如数据库连接。在上面的例子中,mapPartition只需初始化3个资源(3个分区每个1次),而map要初始化10次(10个元素每个1次),显然在大数据集情况下(数据集中元素个数远大于分区数),mapPartitons的开销要小很多,也便于进行批处理操作。
mapPartitionsWithIndex和mapPartitons类似,只是其参数多了个分区索引号。

相关文章: