1、常见的数据倾斜问题及分析
spark中的数据倾斜: 参考:https://blog.csdn.net/weixin_38750084/article/details/82721319
Hive中的数据倾斜:参考:http://www.itkeyword.com/doc/1175452294333995x418/hive-hive.groupby.skewindata-hive.optimize.skewjoin
2、hive的几种文件存储格式和压缩方式对比
Textfile、Sequencefile、ORC、Parquet、RCfile、Avro
其中Textfile和Sequencefile都是基于行储存的、RCfile集行列储存于一体
1)、压缩时间上除Sequencefile外其他都差不多
2)、数据压缩比例上ORC为最优,其次为Parquet,其他都差不多
3)、就sql查询速度而言,ORC最优,其次为Parqeugt,当今主流也为这两种
3、spark内存分配
spark JVM heap分为三部分:spark memory、user memory、reserved memory
reserved memory:系统预留内存,用于储存spark内部对象,默认大小为300M,一般是固定不变的,在系统运行的时候 Java Heap 的大小至少为 Heap Reserved Memory x 1.5. e.g. 300MB x 1.5 = 450MB 的 JVM配置
user memory:spark中临时数据或自己定义的一些数据的存储空间。如task执行和task执行产生的临时对象,大小为(JVM memory - 300M) x 40%
spark memory:系统框架运行时所需要的空间,由两部分构成,storgae和execution内存两部份组成,一共占(JVM memory - 300M) x 60%,其中storgae和execution又各占50%
storgae memory:主要负责储存 Persist、Cache、Unroll 以及 Broadcast 的数据
execution memory:主要负责shuffle阶段所用的空间,例如join、shuffle都是在这个内存中执行的,当内存不够了会溢写磁盘,减少IO
详细参照:https://blog.csdn.net/qq_37303226/article/details/80774387
4、scala传名调用和传值调用
传值调用在进入函数体之前就对参数表达式进行了计算,传名调用是将为计算的参数表达式传到函数体内需要用到的时候再进行计算
传值调用避免了函数内部多次使用,传名调用的优势在于如果参数在函数体内部没有用到,就不需要在计算了,这种情况下传名调用的效率会高些
5、yarn的资源调度
yarn的三种资源调度器(FIFO Scheduler 队列调度器、Capacity Scheduler 容量调度器、Fair Scheduler 公平调度器)
FIFO:按提交顺序组成一个队列,先进先出,必须等一个任务执行完之后才能执行下一个任务
Capacity:有一个专门的队列来运行小任务,小任务设置一个队列会预先占用一定资源,所以往往导致大任务执行的时间落后于FIFO调度的时间
Fair:不需要预先分配资源,会动态为每个job分配资源,当一个大的job在执行时会占用整个集群的资源,再提交一个小任务时,Fair调度器会分配一半资源给小任务。第二个任务提交时,会有一定延迟,需要等第一个任务释放占用的Container
6、hdfs上传下载流程
7、spark任务运行基本机制
-
driver端对业务逻辑进行解析,记录父RDD到子RDD的业务逻辑转换以 读取数据的位置,并记录算子传递的函数。
-
Driver端解析到action算子的时候,触发任务的提交,此时有向无环图DAG(顶点:RDD 边:依赖关系 反映了RDD之间的依赖关系)已经生成了
-
DAGScheduler对stage进行切分,从FinalRDD向前遍历,如果父RDD与子RDD是窄依赖就加入当前stage,是宽依赖就进行切分,依次进行,直到没有父RDD时终止
-
DAGScheduler开始提交stage,提交时从finalStage开始提交,提交时向前遍历,如果还有父stage没有提交就先提交父stage,依次类推,直到没有父stage,提交时,会对stage生成task(spark任务执行的最小单位),一个stage最后一个RDD有几个分区就会有几个task
-
DAGScheduler会把一个stage中的所有task组装成一个TaskSet,然后交给TaskScheduler去调度
-
TaskScheduler把TaskSet解析成一个一个的Task,并进行序列化发给Excutor端去执行
-
Excutor接收TaskScheduler的task并将其反序列化
-
Excutor端创建一个TaskRunner(线程)并放到线程池,一旦有资源,run方法就会被调用,最终被执行的是task中的runTask方法
到此为止spark才真正以task为单位分布式的运行在集群中
SchedulerBackend:主要负责通信
8、spark sql的解析过程
sqlContext的解析过程:
(1)SQL语句经过SqlParse解析成Unresolved LogicalPlan
(2)使用analyzer结合数据字典(cataqlog)进行绑定,生成resolved LogicalPlan
(3)使用optimizer对resolved LogicalPlan进行优化,生成optimized LogicalPlan
(4)使用SparkPlan将LogicalPlan转换成PhysicalPlan
(5)使用prepareForExecution()将PhysicalPlan转换成可执行物理计划
(6)使用execute()执行物理计划
(7)生成SchemaRDD
但catalyst解析做的有些简陋,很多不支持,所以还是声明的hiveContext对象
hiveContext的解析过程:
(1)SQL语句经过HiveQl.parseSql解析成了Unresolved LogicalPlan
(2)使用analyzer结合hive的metastore进行绑定,生成resolved LogicalPlan
(3)使用optimizer对resolved LogicalPlan进行优化,生成optimized LogicalPlan
(4)使用hivePlanner将LogicalPlan转换成PhysicalPlan
(5)使用prepareForExecution()将PhysicalPlan转换成可执行物理计划
(6)使用execute()执行可执行物理计划
(7)执行后,使用map(_.copy)将结果导入SchemaRDD。 (最终转化为RDD)
9、hive优化问题
9.1 HQL优化问题
(1) hive表join操作时大小表的位置
1、对于多表join,hive会将前面的表缓存在reducer的内存中,然后后面的表会流式的进入reducer和reducer内存中其它的表做join,所以为了防止数据量过大导致oom,将数据量最大的表放到最后,或者通过“/*STREAMTABLE(big table)*/”显示指定reducer流式读入的表
2、开启mapjoin,通常用于一个小表和大表的join,具体表大小由参数hive.mapjoin.smalltable.filesize决定,参数表示小表的大小,默认值为25M, hive 0.7之前,需要使用hint提示*/+mapjoin(table)*/才会执行Mapjoin,否则执行Common Join,但在0.7后,只需设置hive.auto.convert.join=true,就会自动转为mapjoin
参考: https://blog.csdn.net/u013668852/article/details/79768266
https://www.cnblogs.com/yyy-blog/p/7077481.html
https://blog.csdn.net/u012922838/article/details/78220552
https://www.cnblogs.com/wujin/p/6089314.html mapreduce常见的join方案
https://blog.csdn.net/shj1119/article/details/27511115 streamtable关键字
以上两者的区别:
通过/*streamtable(big table)*/指定想要作为流数据的表的join操作在reduce端完成的,而通过开启mapjoin的join操作是在map端完成的,所以默认情况下,进行join操作时,小表放左边,大表放右边。并且开启mapjoin时,能避免数据倾斜,因为没有了reduce,就避免了shuffle
第二种join操作优化方法:
hive.optimize.skewjoin=true;
如果是join过程出现倾斜,应该设置为true
set hive.skewjoin.key=100000;
这个是join的键对应的记录条数超过这个值则会进行优化
简单说就是一个job变为两个job执行HQL
(2) 对于group by的优化
对于含有count(distinct)的情况,容易产生数据倾斜
解决方法:
1、若含有大量空值,可先将空值过滤,count值再加1
2、若存在多个count()值,则需要设置数据负载均衡set hive.groupby.skewindata=ture和hive.map.aggr=true能够解决数据倾斜问题
这样设置时,就启动两个job,具体原理参照:https://blog.csdn.net/chybin500/article/details/80988089
9.2 Hive job优化问题
(1) 开启job并行化执行
hive默认job顺序执行,1个HQL会被拆分成多个job执行,当多个job间没有相互依赖关系时,可以开启并行化执行:set hive.exec.parallel=true
可设置最大允许并行的job数set hive.exec.parallel.thread.number=8;就是控制对于同一个sql来说同时可以运行的job的最大值,该参数默认为8.此时最大可以同时运行8个job
(2) 数据本地化执行(有条件限制,所以一般条件下不用)
前提条件:job的输入数据大小必须小于参数hive.exec.mode.local.auto.inputbytes.max(默认128MB)
job的map数必须小于参数:hive.exec.mode.local.auto.tasks.max(默认为4)太多没有足够的slots
job的reduce数必须为0或1
(3) job合并小文件
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
多个split合成一个,合并split数由mapred.max.split.size限制的大小决定
9.3 Hive map优化问题
设置参数开启map端局部聚合:set hive.map.aggr=true;相当于map端执行combiner
更多hive优化问题参考:https://m.aliyun.com/yunqi/articles/59635
https://blog.csdn.net/yu0_zhang0/article/details/81776459
HQL的编译过程:https://tech.meituan.com/2014/02/12/hive-sql-to-mapreduce.html
10、spark的整体工作机制
11、Hadoop角色SecondNameNode的作用
SecondNameNode并不是NameNode的备份,相当于NameNode的一个助手节点,是一个检查点
NameNode面临的问题:
NameNode用来保存HDFS的元数据信息,NameNode运行时,这些信息是保存在内存中的,但是也可以持久化到磁盘中
保存元数据信息时以依赖两个文件:
1、fsimage—>NameNode启动时对整个文件系统的快照
2、edit logs—>NameNode启动时候,对整个文件系统的改动序列
只有在NameNode重启时,edit logs才会合并到fsimage文件中,但集群中NameNode很少重启,导致edit logs不能及时合并到fsimage中,当NameNode挂掉时,内存中的edit logs会丢失,或者重启NameNode时,要花费很多时间合并文件
解决办法:
SecondNameNode:职责是将NameNode的edits logs合并到fsimage上
具体工作过程:
1、定时到NameNode上获取edit logs,并更新到fsimage(SecondNameNode自身的fsimage)上
2、合并完成后,将fsimage复制到NameNode上
3、NameNode重启时直接使用新的文件,减少启动时间
参考:https://www.cnblogs.com/51python/p/10995243.html