ResouceManager的源代码在hadoop-yarn-server-resourcemanager pom依赖中
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
<version>2.7.3</version>
</dependency>
RPC
remote procedure call,远程过程调用
IPC
inter process communication,进程间通信,Socket,IO,(Channel|Socket)
PB
数据格式。
avro
json
xml
Protobuf,
google串行化系统。
自动生成的。
mr作业提交过程
1.Local
mapreduce.Job
--> JobSubmitter.submitInternal()
-->准备工作(检查目录,上传文件)
-->LocalJobRunner.submitJob();
-->从mapreduce.Job 转换成LocalJobRunner$Job(线程),并同时启动线程
[孵化新线程]
--------------------
LocalJobRunner$Job.run()
-->读取splitmetainfo,创建List<LocalJobRunner$Job$MapTaskRunnable>
-->创建线程池
-->通过线程池运行maptask.
[孵化新线程运行maptask]
---------------------------
LocalJobRunner$Job$MapTaskRunnable.run()
--> new MapTask()
--> MapTask.runNewMapper()
--> 反射创建WordCountMapper(..)
--> WordCountMapper.run();
setup();
for(){
map(...)
}
cleanup();
[孵化新线程运行reduce task]
---------------------------
LocalJobRunner$Job$ReduceTaskRunnable.run()
--> new ReduceTask()
--> MapTask.runNewMapper()
--> 反射创建WordCountMapper(..)
--> WordCountMapper.run();
setup();
for(){
map(...)
}
cleanup();
2.完全分布式
Job //作业
Application //应用
1.运行job
2.从RM获取AppId
3.上传资源到hdfs
job.xml
job.jar
job.split
job.splitmetainfo
4.向RM提交application
5.创建MRAppMaster
5.a)RM联系NM
5.b)NM启动MRAppMaster
6.AppMaster初始化job
7.Appmaster从hdfs检索切片信息
8.AppMaster向RM请求分配资源,RM回传资源列表。
9.AppMaster通过资源列表寻找对应主机,联系NM
10.NM启动在JVM中启动YarnChild,在YarnChild中运行MapTask | ReduceTask
3.通过google pb串行化技术,结合PBRpcEngine实现底层的数据传输。
combiner
map的reduce,先期聚合。提升性能。
partition
切片
切片包含的是偏移量和长度信息,不含有数据,切片数=mapper数。
解决数据倾斜
1. 重新设计key
2.自定义分区函数:hashParitioner
3.Shuffle:混洗
RpcKind种类
1.RPC_builtin
2.rpc_writable
3.rpc_protocol_buffer
hadoop IPC(进程间通信) + RPC(远程过程调用)
1.定义接口
package com.it18zhang.hadoop.ipc;
import org.apache.hadoop.ipc.VersionedProtocol;
/**
* Created by Administrator on 2017/5/26.
*/
public interface IHelloWorldService extends VersionedProtocol {
static final long versionID = 1 ;
public String sayHello(String name);
}
2.实现接口
package com.it18zhang.hadoop.ipc;
import org.apache.hadoop.ipc.ProtocolSignature;
import java.io.IOException;
/**
* Created by Administrator on 2017/5/26.
*/
public class HelloWorldServiceImpl implements IHelloWorldService {
public String sayHello(String name) {
System.out.println(name);
return "hello " + name;
}
/**
* 协议版本
* @param protocol
* @param clientVersion
* @return
* @throws IOException
*/
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
return 0;
}
/**
* 协议签名
* @param protocol
* @param clientVersion
* @param clientMethodsHash
* @return
* @throws IOException
*/
public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException {
return null;
}
}
3.创建Server
package com.it18zhang.hadoop.ipc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import java.io.IOException;
/**
* Created by Administrator on 2017/5/26.
*/
public class MyServer {
public static void main(String[] args) throws IOException {
new RPC.Builder(new Configuration())
.setProtocol(IHelloWorldService.class)
.setInstance(new HelloWorldServiceImpl())
.setBindAddress("localhost")
.setPort(8888)
.build().start();
}
}
4.创建Client
package com.it18zhang.hadoop.ipc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import java.net.InetSocketAddress;
/**
* Created by Administrator on 2017/5/26.
*/
public class MyClient {
public static void main(String[] args) throws Exception {
IHelloWorldService s = RPC.getProxy(IHelloWorldService.class,
IHelloWorldService.versionID,
new InetSocketAddress("localhost", 8888),
new Configuration());
String msg = s.sayHello("tom");
System.out.println(msg);
}
}
Carriage return
CR //回车符,\r
LineFeed
LF //换行符,\n
windows杀死进程
taskkill /F /PID xxx
客户端InputFormat、RecordReader调用过程
Client -> FileInputFormat.getSplits() -> TextInputFormat.isSplittable(..)
org.apache.hadoop.mapreduce.lib.input.TextInputFormat-->RecordReader()-->LineRecordReader
-->nextKeyValue()
inputFormat有getSplits(),isSplitable(),getRecordReader()的方法;,前两个准备文件时调用;
RecordReader位于mapper端之前;mapper反复调用RecordReader的nextKeyValue()方法
自定义InputFormat + RecordReader
1.创建InputFormat
package com.it18zhang.hadoop.mapreduce.inputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
/**
* 自定义InputFormat,将整个文件一次性读完.
*/
public class WholeFileInputFormat extends FileInputFormat<NullWritable,BytesWritable> {
/**
* 不可切割
* @param context
* @param filename
* @return
*/
protected boolean isSplitable(JobContext context, Path filename) {
return false ;
}
public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
return new WholeFileRecordReader();
}
/**
* Reader
*/
class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!processed) {
//通过文件切片得到文件长度,构建内容字节数组
byte[] contents = new byte[(int) fileSplit.getLength()];
//通过文件切片得到文件路径
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
public BytesWritable getCurrentValue() throws IOException,
InterruptedException {
return value;
}
public float getProgress() throws IOException {
return processed ? 1.0f : 0.0f;
}
public void close() throws IOException {
}
}
}
3.修改Map的输入keyvalue类型
package com.it18zhang.hadoop.mapreduce.inputformat;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
/**
* 创建Mapper
*/
public class WordCountMapper extends Mapper<NullWritable,BytesWritable,Text,IntWritable> {
/**
*
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
//得到有效的字节长度
int len = value.getLength();
String doc = new String(value.get(),0,len);
System.out.println("doc == " + doc );
System.out.println("===============================");
StringReader reader = new StringReader(doc);
BufferedReader bufReader = new BufferedReader(reader) ;
String line = null ;
while((line = bufReader.readLine()) != null){
String[] arr = line.split(" ");
for(String word : arr){
context.write(new Text(word),new IntWritable(1));
}
}
bufReader.close();
}
}
4.修改App代码
package com.it18zhang.hadoop.mapreduce.inputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
*
*/
public class App {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS","file:///");
conf.set("mapreduce.framework.name","local");
//创建job
Job job = Job.getInstance(conf);
job.setJobName("WordCount");
job.setJarByClass(App.class);
//添加输入路径(可以多次添加)
FileInputFormat.addInputPath(job,new Path("d:/mr/wc/hello.txt"));
//设置输出路径,只能一个。而且目录不能存在。
FileOutputFormat.setOutputPath(job, new Path("d:/mr/wc/out"));
//设置mapper类
job.setInputFormatClass(WholeFileInputFormat.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setNumReduceTasks(1);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
}
}
5.运行查看结果。
默认情况下的TextInputFormat的mapper 数量不可控制;但NLineInputFormat、DBInputFormat可控。
SequenceFileInputFormat
默认是可切割的,isSplitable true;
有createRecordReader()方法,返回SequenceFileRecordReader<K,V>()-->nextKeyValue();
*.seq
KeyValueTextInputFormat
普通文本文件,使用指定的分割符(只能是单个字符).
createRecordReader()-->KeyValueLineRecordReader(),内含sepStr分隔符(默认\t)
可通过: job.getConfiguration().set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,",");来自定义分隔符
NLineInputFormat
N行构成一个split.
默认为1行
//设置切片行数:
NLineInputFormat.setNumLinesPerSplit(job,2);
//设置输入格式:
job.setInputFormatClass(NLineInputFormat.class);
MultipleInput()
MultipleInputs.addInputPath(job, ncdcInputPath,TextInputFormat.class, MaxTemperatureMapper.class);
MultipleInputs.addInputPath(job, metOfficeInputPath,TextInputFormat.class, MetOfficeMaxTemperatureMapper.class);
DBInputFormat
public List getSplits(JobContext job) throws IOException {
ResultSet results = null;
Statement statement = null;
try {
statement = connection.createStatement();
results = statement.executeQuery(getCountQuery());
results.next();
//数据库中查询的条数
long count = results.getLong(1);
//切片数,默认为1,可自行配置大小
int chunks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
//每个切片的大小(数据条数):每个mapp
long chunkSize = (count / chunks);
从数据库中提取数据,做mr的数据来源。
1.准备mysql数据。
big5
2.select * from test ;
+----+----------+------+
| id | name | age |
+----+----------+------+
| 1 | kkk | 20 |
| 2 | tomas | 13 |
| 3 | tomasLee | 14 |
+----+----------+------+
3.id kkk:20
NUM_MAPS数量是可配的;决定切片的数量
Reader<LongWritable,DBWritable>.
4.DBInputFormat分析
DBInputSplit{
start //起始行索引
end //结束行索引
}
5.自定义MyDBWritable
package com.it18zhang.hadoop.mapreduce.inputformat.db;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
/**
* 自定义Wrtiable + DBWritable
*/
public class MyDBWritable implements Writable,DBWritable{
private int id = 0;
private String name = "" ;
private int age = 0 ;
public void write(DataOutput out) throws IOException {
out.writeInt(id);
out.writeUTF(name);
out.writeInt(age);
}
public void readFields(DataInput in) throws IOException {
this.id = in.readInt() ;
this.name = in.readUTF() ;
this.age = in.readInt() ;
}
public void write(PreparedStatement statement) throws SQLException {
}
/**
* 读取
*/
public void readFields(ResultSet rs) throws SQLException {
this.id = rs.getInt("id");
this.name = rs.getString("name");
this.age = rs.getInt("age");
}
}
6.创建DBMapper
package com.it18zhang.hadoop.mapreduce.inputformat.db;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* DBMapper
*/
public class DBMapper extends Mapper<LongWritable,MyDBWritable,IntWritable,Text> {
protected void map(LongWritable key, MyDBWritable value, Context context) throws IOException, InterruptedException {
int id = value.id;
String name = value.name ;
int age = value.age ;
context.write(new IntWritable(id),new Text(name + ":" + age));
}
}
7.添加pom.xml
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.0.8</version>
</dependency>
8.App
package com.it18zhang.hadoop.mapreduce.inputformat.db;
import com.it18zhang.hadoop.mapreduce.WordCountMapper;
import com.it18zhang.hadoop.mapreduce.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
*
*/
public class App {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS","file:///");
conf.set("mapreduce.framework.name","local");
Job job = Job.getInstance(conf);
job.setJobName("DBDemo");
job.setJarByClass(App.class);
//数据库连接信息
String driverClass = "com.mysql.jdbc.Driver" ;
String url = "jdbc:mysql://localhost:3306/big5" ;
String name = "root" ;
String password = "root" ;
DBConfiguration.configureDB(job.getConfiguration(),driverClass,url,name,password);
//设置DBWritable类
job.getConfiguration().set(DBConfiguration.INPUT_CLASS_PROPERTY,"com.it18zhang.hadoop.mapreduce.inputformat.db.MyDBWritable");
//设置查询记录的语句
job.getConfiguration().set(DBConfiguration.INPUT_COUNT_QUERY,"select count(*) from test");
//设置表名
job.getConfiguration().set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY,"test");
//字段列表
job.getConfiguration().set(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY,"id,name,age");
//设置数据库输入格式
job.setInputFormatClass(DBInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path("d:/mr/out"));
job.setMapperClass(DBMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
//设置输出kv类型
job.waitForCompletion(true);
}
}