hadoop配置文件:http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.6.0-cdh5.9.0/

一:Hadoop简介

  总结下起源于Nutch项目,社区贡献最多是Tom White,之后被雅虎关注,发展越来越好,在医疗健康领域和分子生物领域有很多应用

  能做的事:可以搭建一个处理数据的基础平台;、

    1.提高读取速度

      原理:想要读100T数据,在一个硬盘上时间肯定会很长,但是如果将其分布在100个硬盘上,再将硬盘文件共享,此时读取数据的速度就能提升100倍。

        如果要这样实现就需要解决两个主要问题:1)不同硬盘的故障问题,hadoop提供HDFS(Hadoop Distribute FileSystem)分布式文件系统,基本通过保存文件副本的方式,解决出现硬盘故障问题

                           2)确保每个硬盘拿来的数据正确,提供了MapReduce(一个编程模式)抽象出这些硬盘读写问题并将其转换为map和reduce两部分

  官网主页:http://hadoop.apache.org

  资源库:http://hadoopbook.com

  Hadoop初始版本:Hadoop Common(基础模块,网络通信);Hadoop HDFS( 分布式存储);Hadoop MapReduce(分布式计算)

  Hadoop后来版本:多了一个Hadoop YARN(负责资源管理,资源调度,类似Hadoop的操作系统),基于这个层面有了很多应用层面的框架出现(HIVE,Strom,Spark,Flink,MapReduce)

二:MapReduce

  通过例子:从国家天气数据中,找到每年的最高气温,文件都是以日志二进制形式保存;

  对于这种情况,就非常适合用Hodoop的MapReduce来解决了,主要解题思路:先将每年日志文件通过map函数变为特定集合,再通过reduce函数,在每个map中元素做reduce函数处理这里是取最大值,这样mapreduce就找到每年的最大气温了

  Hadoop Streaming 是MapReduce的API

  概述: 

  <1>将分布式计算作业拆分成两个阶段:Mapper和Reducer
  <2>Shuffle流程:连接Mapper和Reducer阶段
    I.shuffle写入流程
    mapper任务将输出数据写到本地磁盘上
    II.shuffle读取流程
    reducer任务从mapper磁盘上远程读取数据信息
  <3>使用场景:离线批处理,速度慢
  <4>缺点:各个task任务需要不断申请释放资源,过多使用磁盘

  流程图:

跟着我一起学习大数据——Hadoop   

  <1>输入文件切片
  <2>mapper进程处理切片
  <3>shuffle流程
  <4>reducer进程聚合数据
  <5>输出文件

  2.1 具体聊聊

    1)案例运行方式  

      a.单机运行
      <1>导入window支持的两个文件:winutils.exe和hadoop.dll放到C:\java\hadoop-2.6.0-cdh5.9.0\bin目录下
      <2>配置HADOOP_HOME环境变量(需要重启机器)
        临时配置环境变量:System.setProperty("hadoop.home.dir","C:\java\hadoop-2.6.0-cdh5.9.0");
      <3>修改NativeIO类,将access0调用处直接换成true

    WardCount例子:单机运行java方式mapreduce方式

package org.mapreduce.test;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class WordCount {
    
    //临时配置HADOOP_HOME环境变量
    static {
        System.setProperty("hadoop.home.dir", "E:\\hadoop-2.6.0-cdh5.9.0");
    }
    
    /**
     * 默认MapReduce是通过TextInputFormat进行切片,并交给Mapper进行处理
     * TextInputFormat:key:当前行的首字母的索引,value:当前行数据
     * Mapper类参数:输入key类型:Long,输入Value类型:String,输出key类型:String,输出Value类型:Long
     * MapReduce为了网络传输时序列化文件比较小,执行速度快,对基本类型进行包装,实现自己的序列化
     * @author Administrator
     *
     */
    public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

        LongWritable one = new LongWritable(1);
        
        /**
         * 将每行数据拆分,拆分完输出每个单词和个数
         */
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                throws IOException, InterruptedException {
            String words = value.toString();
            //将每行数据拆分成各个单词
            String[] wordArr = words.split(" ");
            //遍历各个单词
            for (String word : wordArr) {
                //输出格式<单词,1>
                context.write(new Text(word), one);
            }
        }
        
    }
    
    /**
     * 进行全局聚合
     * Reducer参数:输入key类型:String,输入Value类型:Long,输出key类型:String,输出Value类型:Long
     * @author Administrator
     *
     */
    public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

        /**
         * 将map输出结果进行全局聚合
         * key:单词, values:个数[1,1,1]
         */
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values,
                Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
            Long sum = 0L;
            for (LongWritable value : values) {
                //累加单词个数
                sum += value.get();
            }
            //输出最终数据结果
            context.write(key, new LongWritable(sum));
        }

        
        
    }
    
    /**
     * 驱动方法
     * @param args
     * @throws IllegalArgumentException
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     */
    public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
        //0.创建一个Job
        Configuration conf = new Configuration();
        //连接hadoop环境
//        conf.set("fs.defaultFS", "hdfs://hadoop-senior01.test.com:8020");
        
        Job job = Job.getInstance(conf, "word-count");
        //通过类名打成jar包
        job.setJarByClass(WordCount.class);
        
        //1.输入文件
        FileInputFormat.addInputPath(job, new Path(args[0]));
        //2.编写mapper处理逻辑
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        //3.shuffle流程(暂时不用处理)
        
        //4.编写reducer处理逻辑
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //5.输出文件
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        //6.运行Job
        boolean result = job.waitForCompletion(true);
        System.out.println(result ? 1 : 0);
    }
    
}
View Code

相关文章: