1.自定义groupingCompatator实现分组求top1 topN
Order_0000001 Pdt_01 222.8
Order_0000001 Pdt_05 25.8
Order_0000002 Pdt_03 522.8
Order_0000002 Pdt_04 122.4
Order_0000002 Pdt_05 722.4
Order_0000003 Pdt_01 222.8
求取每给订单当中,金额最大的那个商品的价格是多少
求前top2,出来了两条数据,不满足我们每个组当中取前两个最大的值
select max(price)
from order o group by o.orderid order by price desc limit 2
top2表示的是每个订单当中最大的前两个值分组求topN
Order_0000001 Pdt_01 222.8
Order_0000001 Pdt_05 25.8
Order_0000002 Pdt_05 722.4
Order_0000002 Pdt_03 522.8
Order_0000003 Pdt_01 222.8
如何通过mapreduce来实现
如何通过MR实现分组求top1
第一个问题:求金额的最大值,要排序,按照金额来进行排序
第二个问题:如何区分不同的订单,,通过订单id来区分我们不同的订单,相同的订单,发送到同一个reduce里面去,形成一个集合
分区与分组的关系:
分区:主要是决定了我们的数据去往哪一个reduce
分组:决定了哪些数据分到一个组里面去,形成一个集合,一起调用一次reduce逻辑
2.Mapreduce的其他补充
2.1多job串联
2.2Mapreduce的参数优化
Mapreduce提交任务过程
虚拟内核:根据我们CPU的型号,CPU的类型,虚拟出来的一些核数
以下调整参数都在mapred-site.xml这个配置文件当中有
//以下参数是在用户自己的mr应用程序中配置就可以生效
(1) mapreduce.map.memory.mb: 一个Map Task可使用的资源上限(单位:MB),默认为1024。如果Map Task实际使用的资源量超过该值,则会被强制杀死。
(2) mapreduce.reduce.memory.mb: 一个Reduce Task可使用的资源上限(单位:MB),默认为1024。如果Reduce Task实际使用的资源量超过该值,则会被强制杀死。
(3) mapred.child.java.opts 配置每个map或者reduce使用的内存的大小,默认是200M
(4) mapreduce.map.cpu.vcores: 每个Map task可使用的最多cpu core数目, 默认值: 1
(5) mapreduce.reduce.cpu.vcores:每个Reduce task可使用的最多cpu core数目, 默认值: 1
virtual 虚拟的
//shuffle性能优化的关键参数,应在yarn启动之前就配置好
(6)mapreduce.task.io.sort.mb 100
//shuffle的环形缓冲区大小,默认100m
(7)mapreduce.map.sort.spill.percent 0.8
//环形缓冲区溢出的阈值,默认80%
//应该在yarn启动之前就配置在服务器的配置文件中才能生效
以下配置都在yarn-site.xml配置文件当中配置
(8) yarn.scheduler.minimum-allocation-mb 1024 给应用程序container分配的最小内存
(9) yarn.scheduler.maximum-allocation-mb 8192 给应用程序container分配的最大内存
(10)yarn.scheduler.minimum-allocation-vcores 1 container最小的虚拟内核的个数
(11)yarn.scheduler.maximum-allocation-vcores 32 container最大的虚拟内核的个数
(12)yarn.nodemanager.resource.memory-mb 8192 每个nodemanager给多少内存
2.3容错相关参数
(1) mapreduce.map.maxattempts: 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
(2) mapreduce.reduce.maxattempts: 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
(3) mapreduce.job.maxtaskfailures.per.tracker:当失败的Map Task失败比例超过该值为,整个作业则失败,默认值为0. 如果你的应用程序允许丢弃部分输入数据,则该该值设为一个大于0的值,比如5,表示如果有低于5%的Map Task失败(如果一个Map Task重试次数超过mapreduce.map.maxattempts,则认为这个Map Task失败,其对应的输入数据将不会产生任何结果),整个作业仍认为成功。
(5)mapreduce.task.timeout: Task超时时间,默认值为600000毫秒,经常需要设置的一个参数,**该参数表达的意思为:如果一个task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,可能是卡住了,也许永远会卡主,为了防止因为用户程序永远block住不退出,则强制设置了一个该超时时间(单位毫秒)。**如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after
300 secsContainer killed by the ApplicationMaster.”。
2.4本地运行mapreduce 作业
设置以下几个参数: file:///
mapreduce.framework.name=local
mapreduce.jobtracker.address=local
fs.defaultFS=local
2.5效率和稳定性相关参数
(1) mapreduce.map.speculative: 是否为Map Task打开推测执行机制,默认为true,如果为true,如果Map执行时间比较长,那么集群就会推测这个Map已经卡住了,会重新启动同样的Map进行并行的执行,哪个先执行完了,就采取哪个的结果来作为最终结果,(只会造成集群资源的更加紧张)一般直接关闭推测执行
一个任务为什么会长时间的没有执行完成???有可能是因为数据的倾斜
数据倾斜:大量的数据都涌到同一个reducetask里面去了,造成一个reducetask里面处理的数据量太大,迟迟地不能够完成
(2) mapreduce.reduce.speculative: 是否为Reduce Task打开推测执行机制,默认为true,如果reduce执行时间比较长,那么集群就会推测这个reduce已经卡住了,会重新启动同样的reduce进行并行的执行,哪个先执行完了,就采取哪个的结果来作为最终结果,一般直接关闭推测执行
(3) mapreduce.input.fileinputformat.split.minsize:
FileInputFormat做切片时的最小切片大小,默认为0
(切片的默认大小就等于blocksize,即 134217728)
3.yarn当中的主要组件以及yarn集群架构介绍
3.1yarn的资源调度管理:
yarn是我们hadoop2.x当中引进的一个新的模块,主要用于管理我们集群当中的资源
比如说:内存,cpu
yarn主要就是为了调度资源,管理任务等
yarn的调度可以分为两个层级来说
一级管理调度:
管理计算机的资源(CPU,内存,网络IO,磁盘)
运行的job任务的生命周(每一个应用执行的情况,都需要汇报给ResourceManager)
二级管理调度:
任务的计算模型 (AppMaster的任务精细化管理)
多样化的计算模型 storm spark
yarn的官网文档说明:
http://hadoop.apache.org/docs/r2.7.5/hadoop-yarn/hadoop-yarn-site/YARN.html
3.2yarn集群当中各个组件的作用:
resourceManager:主节点,主要用于接收用户的请求,分配资源
nodeManager:从节点,主要用于处理任务的计算
ApplicationMaster:每提交一个任务,启动一个appmaster,
这个appmaster全权负责管理我们的任务的执行
主要职责:申请资源
分配资源(分配container)
监控任务执行的进度状况
回收资源
与resourceManager通信,报告任务的执行状况
自杀,
Container:资源分配的单位,所有的资源都是以container的形式来进行划分的,便于资源的分配和回收
jobHistory:历史完成的任务的日志信息
TimeLineServer: 2.4版本以后出来的新特性,查看正在执行的任务的信息
yarn的发展历程以及详细介绍:
https://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop-yarn/
resourceManager主要作用:
处理客户端请求
启动/监控ApplicationMaster
监控NodeManager
资源分配与调度
NodeManager主要作用:
单个节点上的资源管理和任务管理
接收并处理来自resourceManager的命令
接收并处理来自ApplicationMaster的命令
管理抽象容器container
定时向RM汇报本节点资源使用情况和各个container的运行状态
ApplicationMaster主要作用:
数据切分
为应用程序申请资源
任务监控与容错
负责协调来自ResourceManager的资源,开通NodeManager监视容的执行和资源使用(CPU,内存等的资源分配)
Container主要作用:
对任务运行环境的抽象
任务运行资源(节点,内存,cpu)
任务启动命令
任务运行环境
yarn的架构
yarn当中的调取器
调度器主要解决的是任务先后提交,如何保证任务最快执行的一种策略
研究的是任务之间如何一起执行的问题
队列 :两端都开口,先进先出
栈:一端开口,一段封闭,先进后出,后进先出
hadoop当中的调度器主要由三种
1.fifo 队列调度器 ,first in first out (没人用)
2.capacity scheduler 容量调度器 apache的hadoop版本默认使用的
容量调度器:将我们集群的资源,划分成好几个队列
30% 40% 30%
3 4 3
任务提交的时候,可以选择不同的队列来进行提交
根据提交的任务需要的资源大小不同,可以将我们的任务,划分到不同的队列下面去
3.fairScheduler 公平调度器 CDH版本的hadoop默认的调度规则
如果没有任务提交,第一个任务过来,将集群当中的所有的资源全部给第一个任务
第二个任务来了,将第一个任务的资源划分一点出来给第二个任务,保证第二个任务也可以执行
保证每一个任务都可以公平的一起执行
一般调度器不会去更改
yarn当中常用的参数设置
定义交换区空间可以使用的大小(交换区空间就是讲一块硬盘拿出来做内存使用)
这里指定的是nodemanager的n内存的2.1倍
yarn.nodemanager.vmem-pmem-ratio 2.1