yihaifutai
   ResouceManager的源代码在hadoop-yarn-server-resourcemanager pom依赖中

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
    <version>2.7.3</version>
</dependency>

RPC

remote procedure call,远程过程调用

IPC

inter process communication,进程间通信,Socket,IO,(Channel|Socket)

PB

数据格式。
avro
json
xml
Protobuf,
google串行化系统。
自动生成的。

mr作业提交过程

1.Local
    mapreduce.Job
        --> JobSubmitter.submitInternal()
                -->准备工作(检查目录,上传文件)
                -->LocalJobRunner.submitJob();
                    -->从mapreduce.Job 转换成LocalJobRunner$Job(线程),并同时启动线程
                    [孵化新线程]
                    --------------------
                    LocalJobRunner$Job.run()
                            -->读取splitmetainfo,创建List<LocalJobRunner$Job$MapTaskRunnable>
                            -->创建线程池
                            -->通过线程池运行maptask.
                            [孵化新线程运行maptask]
                            ---------------------------
                                LocalJobRunner$Job$MapTaskRunnable.run()
                                    --> new MapTask()
                                    --> MapTask.runNewMapper()
                                    --> 反射创建WordCountMapper(..)
                                    --> WordCountMapper.run();
                                        setup();
                                        for(){
                                            map(...)
                                        }
                                        cleanup();
                            [孵化新线程运行reduce task]
                            ---------------------------
                                LocalJobRunner$Job$ReduceTaskRunnable.run()
                                    --> new ReduceTask()
                                    --> MapTask.runNewMapper()
                                    --> 反射创建WordCountMapper(..)
                                    --> WordCountMapper.run();
                                        setup();
                                        for(){
                                            map(...)
                                        }
                                        cleanup();


2.完全分布式
    Job             //作业
    Application     //应用
    1.运行job
    2.从RM获取AppId
    3.上传资源到hdfs
        job.xml
        job.jar
        job.split
        job.splitmetainfo

    4.向RM提交application
    5.创建MRAppMaster
        5.a)RM联系NM
        5.b)NM启动MRAppMaster
    6.AppMaster初始化job
    7.Appmaster从hdfs检索切片信息
    8.AppMaster向RM请求分配资源,RM回传资源列表。
    9.AppMaster通过资源列表寻找对应主机,联系NM
    10.NM启动在JVM中启动YarnChild,在YarnChild中运行MapTask | ReduceTask

3.通过google pb串行化技术,结合PBRpcEngine实现底层的数据传输。

combiner

map的reduce,先期聚合。提升性能。

partition

切片

切片包含的是偏移量和长度信息,不含有数据,切片数=mapper数。

解决数据倾斜

1. 重新设计key
2.自定义分区函数:hashParitioner
3.Shuffle:混洗

RpcKind种类

1.RPC_builtin
2.rpc_writable
3.rpc_protocol_buffer

hadoop IPC(进程间通信) + RPC(远程过程调用)

1.定义接口
    package com.it18zhang.hadoop.ipc;

    import org.apache.hadoop.ipc.VersionedProtocol;

    /**
     * Created by Administrator on 2017/5/26.
     */
    public interface IHelloWorldService extends VersionedProtocol {
         static final long versionID = 1 ;
        public String sayHello(String name);
    }

2.实现接口
    package com.it18zhang.hadoop.ipc;

    import org.apache.hadoop.ipc.ProtocolSignature;

    import java.io.IOException;

    /**
     * Created by Administrator on 2017/5/26.
     */
    public class HelloWorldServiceImpl implements IHelloWorldService {

        public String sayHello(String name) {
            System.out.println(name);
            return "hello " + name;
        }

        /**
         * 协议版本
         * @param protocol
         * @param clientVersion
         * @return
         * @throws IOException
         */
        public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
            return 0;
        }

        /**
         * 协议签名
         * @param protocol
         * @param clientVersion
         * @param clientMethodsHash
         * @return
         * @throws IOException
         */
        public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException {
            return null;
        }
    }

3.创建Server
    package com.it18zhang.hadoop.ipc;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.ipc.RPC;

    import java.io.IOException;

    /**
     * Created by Administrator on 2017/5/26.
     */
    public class MyServer {
        public static void main(String[] args) throws IOException {
            new RPC.Builder(new Configuration())
                    .setProtocol(IHelloWorldService.class)
                    .setInstance(new HelloWorldServiceImpl())
                    .setBindAddress("localhost")
                    .setPort(8888)
                    .build().start();
        }
    }

4.创建Client
    package com.it18zhang.hadoop.ipc;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.ipc.RPC;

    import java.net.InetSocketAddress;

    /**
     * Created by Administrator on 2017/5/26.
     */
    public class MyClient {
        public static void main(String[] args) throws  Exception {
            IHelloWorldService s = RPC.getProxy(IHelloWorldService.class,
                    IHelloWorldService.versionID,
                    new InetSocketAddress("localhost", 8888),
                    new Configuration());
            String msg = s.sayHello("tom");
            System.out.println(msg);
        }
    }

Carriage return

CR          //回车符,\r

LineFeed

LF          //换行符,\n

windows杀死进程

taskkill /F /PID xxx

客户端InputFormat、RecordReader调用过程

Client -> FileInputFormat.getSplits() -> TextInputFormat.isSplittable(..)

org.apache.hadoop.mapreduce.lib.input.TextInputFormat-->RecordReader()-->LineRecordReader
-->nextKeyValue()

inputFormat有getSplits(),isSplitable(),getRecordReader()的方法;,前两个准备文件时调用;
RecordReader位于mapper端之前;mapper反复调用RecordReader的nextKeyValue()方法

自定义InputFormat + RecordReader

1.创建InputFormat
    package com.it18zhang.hadoop.mapreduce.inputformat;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;

    import java.io.IOException;

    /**
     * 自定义InputFormat,将整个文件一次性读完.
     */
    public class WholeFileInputFormat extends FileInputFormat<NullWritable,BytesWritable> {

        /**
         * 不可切割
         * @param context
         * @param filename
         * @return
         */
        protected boolean isSplitable(JobContext context, Path filename) {
            return false ;
        }

        public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            return new WholeFileRecordReader();
        }

        /**
         * Reader
         */
        class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
            private FileSplit fileSplit;
            private Configuration conf;
            private BytesWritable value = new BytesWritable();
            private boolean processed = false;

            @Override
            public void initialize(InputSplit split, TaskAttemptContext context)
                    throws IOException, InterruptedException {
                this.fileSplit = (FileSplit) split;
                this.conf = context.getConfiguration();
            }

            @Override
            public boolean nextKeyValue() throws IOException, InterruptedException {
                if (!processed) {
                    //通过文件切片得到文件长度,构建内容字节数组
                    byte[] contents = new byte[(int) fileSplit.getLength()];
                    //通过文件切片得到文件路径
                    Path file = fileSplit.getPath();
                    FileSystem fs = file.getFileSystem(conf);
                    FSDataInputStream in = null;
                    try {
                        in = fs.open(file);
                        IOUtils.readFully(in, contents, 0, contents.length);
                        value.set(contents, 0, contents.length);
                    } finally {
                        IOUtils.closeStream(in);
                    }
                    processed = true;
                    return true;
                }
                return false;
            }

            public NullWritable getCurrentKey() throws IOException, InterruptedException {
                return NullWritable.get();
            }

            public BytesWritable getCurrentValue() throws IOException,
                    InterruptedException {
                return value;
            }

            public float getProgress() throws IOException {
                return processed ? 1.0f : 0.0f;
            }

            public void close() throws IOException {
            }
        }
    }

3.修改Map的输入keyvalue类型
    package com.it18zhang.hadoop.mapreduce.inputformat;

    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.Mapper;

    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.StringReader;

    /**
     * 创建Mapper
     */
    public class WordCountMapper extends Mapper<NullWritable,BytesWritable,Text,IntWritable> {

        /**
         *
         * @param key
         * @param value
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
            //得到有效的字节长度
            int len = value.getLength();
            String doc = new String(value.get(),0,len);
            System.out.println("doc == " + doc );
            System.out.println("===============================");
            StringReader reader = new StringReader(doc);
            BufferedReader bufReader = new BufferedReader(reader) ;
            String line = null ;
            while((line = bufReader.readLine()) != null){
                String[] arr = line.split(" ");
                for(String word : arr){
                    context.write(new Text(word),new IntWritable(1));
                }
            }
            bufReader.close();
        }
    }

4.修改App代码
    package com.it18zhang.hadoop.mapreduce.inputformat;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    /**
     * 
     */
    public class App {
        public static void main(String[] args) throws Exception {

            Configuration conf = new Configuration();
            conf.set("fs.defaultFS","file:///");
            conf.set("mapreduce.framework.name","local");
            //创建job
            Job job = Job.getInstance(conf);
            job.setJobName("WordCount");
            job.setJarByClass(App.class);

            //添加输入路径(可以多次添加)
            FileInputFormat.addInputPath(job,new Path("d:/mr/wc/hello.txt"));
            //设置输出路径,只能一个。而且目录不能存在。
            FileOutputFormat.setOutputPath(job, new Path("d:/mr/wc/out"));

            //设置mapper类
            job.setInputFormatClass(WholeFileInputFormat.class);
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
            job.setNumReduceTasks(1);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);

            //设置输出kv类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            job.waitForCompletion(true);
        }
    }

5.运行查看结果。

默认情况下的TextInputFormat的mapper 数量不可控制;但NLineInputFormat、DBInputFormat可控。

SequenceFileInputFormat

默认是可切割的,isSplitable  true;
有createRecordReader()方法,返回SequenceFileRecordReader<K,V>()-->nextKeyValue();
*.seq

KeyValueTextInputFormat

普通文本文件,使用指定的分割符(只能是单个字符).
createRecordReader()-->KeyValueLineRecordReader(),内含sepStr分隔符(默认\t)
可通过: job.getConfiguration().set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,",");来自定义分隔符

NLineInputFormat

N行构成一个split.
默认为1行
 //设置切片行数:
    NLineInputFormat.setNumLinesPerSplit(job,2);
 //设置输入格式:
    job.setInputFormatClass(NLineInputFormat.class);

MultipleInput()

MultipleInputs.addInputPath(job, ncdcInputPath,TextInputFormat.class, MaxTemperatureMapper.class);
MultipleInputs.addInputPath(job, metOfficeInputPath,TextInputFormat.class, MetOfficeMaxTemperatureMapper.class);

DBInputFormat

public List getSplits(JobContext job) throws IOException {

ResultSet results = null;  
Statement statement = null;
try {
  statement = connection.createStatement();

  results = statement.executeQuery(getCountQuery());
  results.next();
//数据库中查询的条数
  long count = results.getLong(1);
  //切片数,默认为1,可自行配置大小
  int chunks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
  //每个切片的大小(数据条数):每个mapp
  long chunkSize = (count / chunks);


从数据库中提取数据,做mr的数据来源。
1.准备mysql数据。
    big5
2.select * from test ;
    +----+----------+------+
    | id | name     | age  |
    +----+----------+------+
    |  1 | kkk      |   20 |
    |  2 | tomas    |   13 |
    |  3 | tomasLee |   14 |
    +----+----------+------+

3.id    kkk:20
    NUM_MAPS数量是可配的;决定切片的数量

    Reader<LongWritable,DBWritable>.
4.DBInputFormat分析
    DBInputSplit{
        start       //起始行索引
        end         //结束行索引
    }
5.自定义MyDBWritable
    package com.it18zhang.hadoop.mapreduce.inputformat.db;

    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.lib.db.DBWritable;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;

    /**
     * 自定义Wrtiable + DBWritable
     */
    public class MyDBWritable implements Writable,DBWritable{
        private int id  = 0;
        private String name = "" ;
        private int age = 0 ;

        public void write(DataOutput out) throws IOException {
            out.writeInt(id);
            out.writeUTF(name);
            out.writeInt(age);
        }

        public void readFields(DataInput in) throws IOException {
            this.id = in.readInt() ;
            this.name = in.readUTF() ;
            this.age = in.readInt() ;

        }

        public void write(PreparedStatement statement) throws SQLException {

        }

        /**
         * 读取
         */
        public void readFields(ResultSet rs) throws SQLException {
            this.id = rs.getInt("id");
            this.name = rs.getString("name");
            this.age = rs.getInt("age");
        }
    }

6.创建DBMapper
    package com.it18zhang.hadoop.mapreduce.inputformat.db;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    import java.io.IOException;

    /**
     * DBMapper
     */
    public class DBMapper extends Mapper<LongWritable,MyDBWritable,IntWritable,Text> {

        protected void map(LongWritable key, MyDBWritable value, Context context) throws IOException, InterruptedException {
            int id = value.id;
            String name = value.name ;
            int age = value.age ;
            context.write(new IntWritable(id),new Text(name + ":" + age));
        }
    }

7.添加pom.xml
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.0.8</version>
    </dependency>

8.App
    package com.it18zhang.hadoop.mapreduce.inputformat.db;

    import com.it18zhang.hadoop.mapreduce.WordCountMapper;
    import com.it18zhang.hadoop.mapreduce.WordCountReducer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
    import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    /**
     * 
     */
    public class App {
        public static void main(String[] args) throws Exception {

            Configuration conf = new Configuration();
            conf.set("fs.defaultFS","file:///");
            conf.set("mapreduce.framework.name","local");

            Job job = Job.getInstance(conf);
            job.setJobName("DBDemo");
            job.setJarByClass(App.class);

            //数据库连接信息
            String driverClass = "com.mysql.jdbc.Driver" ;
            String url = "jdbc:mysql://localhost:3306/big5" ;
            String name = "root" ;
            String password = "root" ;
            DBConfiguration.configureDB(job.getConfiguration(),driverClass,url,name,password);

            //设置DBWritable类
            job.getConfiguration().set(DBConfiguration.INPUT_CLASS_PROPERTY,"com.it18zhang.hadoop.mapreduce.inputformat.db.MyDBWritable");

            //设置查询记录的语句
            job.getConfiguration().set(DBConfiguration.INPUT_COUNT_QUERY,"select count(*) from test");

            //设置表名
            job.getConfiguration().set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY,"test");
            //字段列表
            job.getConfiguration().set(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY,"id,name,age");

            //设置数据库输入格式
            job.setInputFormatClass(DBInputFormat.class);

            FileOutputFormat.setOutputPath(job, new Path("d:/mr/out"));

            job.setMapperClass(DBMapper.class);

            job.setMapOutputKeyClass(IntWritable.class);
            job.setMapOutputValueClass(Text.class);

            //设置输出kv类型
            job.waitForCompletion(true);
        }
    }

分类:

技术点:

相关文章: