上一个笔记中记录了基于BIO下的Tomcat的简单实现,做到了有请求进来可以进行处理。基础的逻辑实现了。下面想基于NIO实现相关逻辑。
1 代码结构
其中的html代码是拷贝的上一笔记中的。代码的结构也基本一致。
2 实现逻辑
这里的实现逻辑步骤如下:
1 启动线程监听连接
2 启动线程监听Selector,如果有对应的SelectionKey,包装成处理任务,丢入到队列中等待处理
3 启动线程用于监听任务队列,从队列中读取待处理的任务,针对不同的任务当前的状态进行操作
整理的请求进来后的流转过程如下:
3 代码说明
3.1 连接监听
定义AcceptHandler类,实现Runnable接口,绑定对应的接口后,启动线程,并开始监听连接,用于接受连接的ServerSocketChannel需要标记为阻塞的,当前这里可以设置为非阻塞,那么在下面channel.accept()中会收到Null值,需要做对应的判断和处理。
/**
* 启动服务端socket
* @return
*/
public AcceptHandler socket(){
try {
channel = ServerSocketChannel.open();
channel.socket().bind(new InetSocketAddress(port));
channel.configureBlocking(true);
selector = Selector.open();
} catch (IOException e) {
//抛出异常,显示服务端出现问题,暂不处理
e.printStackTrace();
}
return this;
}
/**
* 连接请求接受线程
*/
@Override
public void run(){
while(true){
try{
SocketChannel socketChannel = channel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ,true);
}catch (Exception e){
e.printStackTrace();
}
}
}
监听到连接后,构造对应的SocketChannel,这个时候SocketChannel需要设置为非阻塞,同时连接已经进来了,想Selector中注册为可读状态。等待任务包装线程包装后丢入到任务队列,等待处理。
3.2 任务包装线程
任务包装线程,从Selector中读取注册过来的SelectionKey,将这些key进行包装处理,包装后将任务丢入到队列中等待处理。具体的逻辑如下:
public AcceptHandler processor(){
pool.submit(new PollorEvent());
return this;
}
public class PollorEvent implements Runnable{
@Override
public void run() {
while(true){
try{
int keyCount = selector.selectNow();
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
boolean flag = (boolean)sk.attachment();
if(flag){
task.offer(new SocketProcessor(sk));
sk.attach(false);
}
iterator.remove();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
}
这里有一个flag的标记,由于一个SelectionKey在注册以后会丢入到线程池中等待被处理,在未被处理前,也就是未被释放前,selector.selectNow()每次都会获取到,那么如果没有标记,每次都会被获取到,同时每次都会被包装一下丢入队列,等待处理。因此,这个给每一个监听到的SelectionKey都标记为是否已经丢入到队列的标记。通过:
sk.attach(false)
在每次获取到这个SelectionKey时,获取设置的值,如果为false,标识无需处理。这个值的初始化,在接收到连接时,向Selector中注册时就已经绑定了。
每一个SelectionKey被包装为SocketProcessor对象后丢入到队列中,这里的队列设置如下:
private LinkedBlockingQueue<SocketProcessor> task = new LinkedBlockingQueue<>();
3.3 任务处理线程
任务处理线程主要是初始化一定量的线程后,所有的线程等待从队列中获取任务,获取到任务后,读取请求信息后,进行包装请求Request和Response.同时判断当前请求的类型,来进行分发处理。基础的代码如下:
public AcceptHandler executor(){
for(int i=0;i<10;i++){
pool.submit(new ProcessorExecutorTask());
}
return this;
}
public class ProcessorExecutorTask implements Runnable{
@Override
public void run() {
while(true){
try{
//从队列中读取一个任务开始进行处理
SocketProcessor processor = task.take();
//开始处理数据
processor.run();
}catch (Exception e){
e.printStackTrace();
}
}
}
}
public class SocketProcessor {
private static final String API_DATA_START = "/api";
private static final String STATIC_RESOURCE_URI = "static";
private Map<String,String> relation = new HashMap<>();
private SelectionKey sk;
public SocketProcessor(SelectionKey sk){
this.sk = sk;
relation.put("/api/test","com.simple.nio.tomcat.TestServlet");
}
public void run() throws IOException{
boolean closeSocket = false;
if (sk.isReadable()) {
//读取数据
Request request = new Request(sk);
request.prepareParseRequest();
//写入数据
Response response = new Response(sk,request);
processorRequest(request,response);
closeSocket = true;
}
if (!closeSocket && sk.isWritable()) {
closeSocket = true;
}
if (closeSocket) {
if (sk.isValid()) {
sk.cancel();
}
if(sk.channel().isOpen()){
sk.channel().close();
}
}
}
/**
* 处理请求
* @param request
* @param response
*/
public void processorRequest(Request request,Response response){
try {
//针对不同的请求字段,进行分开处理,如果是静态资源的数据,则使用StaticResourceProcessor,Api接口请求,则使用ApiResouceProcessor
if(request.getRequestPath().startsWith(API_DATA_START)){
new ApiResouceProcessor().proccess(request,response);
}else{
new StaticResourceProcessor().proccess(request,response);
}
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 静态资源的处理
*/
public class StaticResourceProcessor {
/**
* 开始处理请求
* @param request
* @param response
* @throws IOException
*/
public void proccess(Request request, Response response) throws Exception {
//就是上面的那个字符串截取方法
String uri = request.getRequestPath();
//获取uri对应的静态资源
URL ROOT = this.getClass().getClassLoader().getResource(STATIC_RESOURCE_URI);
File staticResource = new File(ROOT.getPath() + uri);
byte[] bytes = null;
if(staticResource.exists() ||staticResource.isFile()){
bytes = IOUtils.toByteArray(new FileInputStream(staticResource));
}else{
staticResource = new File(ROOT.getPath() + "/404.html");
bytes = IOUtils.toByteArray(new FileInputStream(staticResource));
}
response.writer(bytes);
}
}
/**
* 接口资源的处理
*/
public class ApiResouceProcessor{
/**
* 开始处理请求
* @param request
* @param response
* @throws IOException
*/
public void proccess(Request request, Response response) throws Exception {
//就是上面的那个字符串截取方法
String uri = request.getRequestPath();
//找到对应的路径地址对应的servlt
String clazz = relation.getOrDefault(uri,null);
if (clazz == null || clazz.length() <= 0 ) {
throw new RuntimeException("no mapping relation of servlet");
}
//实例化该servlet对象
Class<?> servletClass = this.getClass().getClassLoader().loadClass(clazz);
//实例化对象
Servlet servlet = (Servlet)servletClass.newInstance();
//调用servlet的service方法
servlet.service(request,response);
}
}
}
ProcessorExecutorTask中很简单,只是从队列中获取任务等待处理,这里是take()方法是阻塞的,知道有任务可以获取到时返回。
SocketProcessor中主要是处理任务了,从SelectionKey中读取当前状态,如果是可读的,那么开始构造Request对象和Response对象。然后开始处理。
通过判断当前SelectionKey对象的状态来判断进行何种处理:
1 如果是OP_WRITE,则标记当前已经处理完了,关闭连接。
2 如果是OP_READ,则构造对象开始处理任务,同时判断是静态资源文件还是api接口
针对请求的处理:
1 如果是静态资源文件,则读取对应的文件处理后,包装为字节数组,写入到管道中国
2 如果是api请求,则找到当前地址对应的Servlet,反射实例化对应的Servlet,然后执行service方法。
这里针对api接口的处理逻辑中,可以学习springboot类似的逻辑,通过注解的形式,将这种映射关系先实例化好,请求过来直接进行处理。具体的后面学习springboot源码是在看。
3.4 请求对象和响应对象的包装
Request对象的包装代码如下:
public class Request implements ServletRequest {
/**
* 询Key
*/
private SelectionKey sk;
/**
* 解析用户请求后的URI
*/
private String requestStr;
private String requestMethod;
private String requestPath;
private Map<String,String> header = new HashMap<>();
public Request(SelectionKey sk) {
this.sk = sk;
}
/**
* 解析获取到的数据,丰富当前的request数据
*/
public void prepareParseRequest() throws IOException {
String header = readMessage(sk,1024);
this.requestStr = header;
List<String> list = Arrays.asList(requestStr.split("\r\n"));
// //解析请求方式,请求地址,http协议
parseMethodAndPathFromRequestStr(list.get(0));
// //解析请求头
parserRequestHeaderFromRequestStr(list);
}
private String readMessage(SelectionKey sk,int len) throws IOException{
//重置缓冲区,等待写入
ByteBuffer buffer = ByteBuffer.allocate(len);
//服务端想客户端写入数据
SocketChannel channel = (SocketChannel)sk.channel();
int num = channel.read(buffer);
return num > 0 ? new String(buffer.array(), 0, num):"";
}
/**
* 解析请求方式
* 解析请求地址
* 解析请求协议
* @param path
*/
private void parseMethodAndPathFromRequestStr(String path) throws IOException{
//针对请求头进行处理
List<String> strs = Arrays.asList(path.split(" "));
//切分好以后,进行参数结果判断
if (null == strs || strs.size() != 3) {
throw new IOException("error request");
}
//开始进行相应数据的赋值
this.requestMethod = strs.get(0);
this.requestPath = strs.get(1);
//开始解析路径参数
//解析参数
}
/**
* 解析请求头
* @param list
*/
private void parserRequestHeaderFromRequestStr(List<String> list) throws IOException{
if(list.size() <= 2){
return;
}
List<String> array = list.subList(1,list.size() - 1);
//循环遍历,将字符串进行切割后
for(String item : array){
String[] strs = item.split(": ");
if (strs == null || strs.length != 2) {
throw new IOException("error request");
}
header.put(strs[0],strs[1]);
}
}
xxxxxxxxxxx
}
Request主要是当请求进来以后,从管道中读取对应的信息,并解析请求地址,请求头数据等字段,这里只是用到了请求地址数据,其他的未做处理。
Response的包装代码如下:
public class Response implements ServletResponse {
private Request request;
private SelectionKey sk;
public Response(SelectionKey sk, Request request){
this.sk = sk;
this.request = request;
}
public void println(String message){
writer(200,"OK",message.getBytes());
}
public void writer(byte[] bytes){
writer(200,"OK",bytes);
}
public void writer(int code,String message,byte[] bytes){
String head = "HTTP/1.1 " + code + " " + message + "\r\n\r\n";
byte[] headBytes = head.getBytes();
byte[] result = new byte[headBytes.length + bytes.length];
System.arraycopy(headBytes,0, result,0, headBytes.length);
System.arraycopy(bytes,0, result, headBytes.length, bytes.length);
//重置缓冲区,等待写入
ByteBuffer buffer = ByteBuffer.allocate(result.length);
buffer.put(result);
buffer.flip();
//服务端想客户端写入数据
SocketChannel channel = (SocketChannel)sk.channel();
try {
channel.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
xxxxxxxxx
}
这里的代码其实很少,主要是向管道中写入内容。其他的没有什么。
对应的Servlet中向Response中写入内容时,调用的是:
((Response)response).println("Hello Servlet!");
最后是测试结果: