hadoop配置文件:http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.6.0-cdh5.9.0/
一:Hadoop简介
总结下起源于Nutch项目,社区贡献最多是Tom White,之后被雅虎关注,发展越来越好,在医疗健康领域和分子生物领域有很多应用
能做的事:可以搭建一个处理数据的基础平台;、
1.提高读取速度
原理:想要读100T数据,在一个硬盘上时间肯定会很长,但是如果将其分布在100个硬盘上,再将硬盘文件共享,此时读取数据的速度就能提升100倍。
如果要这样实现就需要解决两个主要问题:1)不同硬盘的故障问题,hadoop提供HDFS(Hadoop Distribute FileSystem)分布式文件系统,基本通过保存文件副本的方式,解决出现硬盘故障问题
2)确保每个硬盘拿来的数据正确,提供了MapReduce(一个编程模式)抽象出这些硬盘读写问题并将其转换为map和reduce两部分
官网主页:http://hadoop.apache.org
资源库:http://hadoopbook.com
Hadoop初始版本:Hadoop Common(基础模块,网络通信);Hadoop HDFS( 分布式存储);Hadoop MapReduce(分布式计算)
Hadoop后来版本:多了一个Hadoop YARN(负责资源管理,资源调度,类似Hadoop的操作系统),基于这个层面有了很多应用层面的框架出现(HIVE,Strom,Spark,Flink,MapReduce)
二:MapReduce
通过例子:从国家天气数据中,找到每年的最高气温,文件都是以日志二进制形式保存;
对于这种情况,就非常适合用Hodoop的MapReduce来解决了,主要解题思路:先将每年日志文件通过map函数变为特定集合,再通过reduce函数,在每个map中元素做reduce函数处理这里是取最大值,这样mapreduce就找到每年的最大气温了
Hadoop Streaming 是MapReduce的API
概述:
<1>将分布式计算作业拆分成两个阶段:Mapper和Reducer
<2>Shuffle流程:连接Mapper和Reducer阶段
I.shuffle写入流程
mapper任务将输出数据写到本地磁盘上
II.shuffle读取流程
reducer任务从mapper磁盘上远程读取数据信息
<3>使用场景:离线批处理,速度慢
<4>缺点:各个task任务需要不断申请释放资源,过多使用磁盘
流程图:
<1>输入文件切片
<2>mapper进程处理切片
<3>shuffle流程
<4>reducer进程聚合数据
<5>输出文件
2.1 具体聊聊
1)案例运行方式
a.单机运行
<1>导入window支持的两个文件:winutils.exe和hadoop.dll放到C:\java\hadoop-2.6.0-cdh5.9.0\bin目录下
<2>配置HADOOP_HOME环境变量(需要重启机器)
临时配置环境变量:System.setProperty("hadoop.home.dir","C:\java\hadoop-2.6.0-cdh5.9.0");
<3>修改NativeIO类,将access0调用处直接换成true
WardCount例子:单机运行java方式mapreduce方式
package org.mapreduce.test; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; public class WordCount { //临时配置HADOOP_HOME环境变量 static { System.setProperty("hadoop.home.dir", "E:\\hadoop-2.6.0-cdh5.9.0"); } /** * 默认MapReduce是通过TextInputFormat进行切片,并交给Mapper进行处理 * TextInputFormat:key:当前行的首字母的索引,value:当前行数据 * Mapper类参数:输入key类型:Long,输入Value类型:String,输出key类型:String,输出Value类型:Long * MapReduce为了网络传输时序列化文件比较小,执行速度快,对基本类型进行包装,实现自己的序列化 * @author Administrator * */ public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { LongWritable one = new LongWritable(1); /** * 将每行数据拆分,拆分完输出每个单词和个数 */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { String words = value.toString(); //将每行数据拆分成各个单词 String[] wordArr = words.split(" "); //遍历各个单词 for (String word : wordArr) { //输出格式<单词,1> context.write(new Text(word), one); } } } /** * 进行全局聚合 * Reducer参数:输入key类型:String,输入Value类型:Long,输出key类型:String,输出Value类型:Long * @author Administrator * */ public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { /** * 将map输出结果进行全局聚合 * key:单词, values:个数[1,1,1] */ @Override protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { Long sum = 0L; for (LongWritable value : values) { //累加单词个数 sum += value.get(); } //输出最终数据结果 context.write(key, new LongWritable(sum)); } } /** * 驱动方法 * @param args * @throws IllegalArgumentException * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException { //0.创建一个Job Configuration conf = new Configuration(); //连接hadoop环境 // conf.set("fs.defaultFS", "hdfs://hadoop-senior01.test.com:8020"); Job job = Job.getInstance(conf, "word-count"); //通过类名打成jar包 job.setJarByClass(WordCount.class); //1.输入文件 FileInputFormat.addInputPath(job, new Path(args[0])); //2.编写mapper处理逻辑 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //3.shuffle流程(暂时不用处理) //4.编写reducer处理逻辑 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //5.输出文件 FileOutputFormat.setOutputPath(job, new Path(args[1])); //6.运行Job boolean result = job.waitForCompletion(true); System.out.println(result ? 1 : 0); } }