上一个笔记中记录了基于BIO下的Tomcat的简单实现,做到了有请求进来可以进行处理。基础的逻辑实现了。下面想基于NIO实现相关逻辑。

1 代码结构源码学习-tomcat的简单实现(二)

其中的html代码是拷贝的上一笔记中的。代码的结构也基本一致。

2 实现逻辑

这里的实现逻辑步骤如下:

1 启动线程监听连接
2 启动线程监听Selector,如果有对应的SelectionKey,包装成处理任务,丢入到队列中等待处理
3 启动线程用于监听任务队列,从队列中读取待处理的任务,针对不同的任务当前的状态进行操作

整理的请求进来后的流转过程如下:
源码学习-tomcat的简单实现(二)

3 代码说明

3.1 连接监听

定义AcceptHandler类,实现Runnable接口,绑定对应的接口后,启动线程,并开始监听连接,用于接受连接的ServerSocketChannel需要标记为阻塞的,当前这里可以设置为非阻塞,那么在下面channel.accept()中会收到Null值,需要做对应的判断和处理。

/**
     * 启动服务端socket
     * @return
     */
    public AcceptHandler socket(){
        try {
            channel = ServerSocketChannel.open();
            channel.socket().bind(new InetSocketAddress(port));
            channel.configureBlocking(true);

            selector = Selector.open();
        } catch (IOException e) {
            //抛出异常,显示服务端出现问题,暂不处理
            e.printStackTrace();
        }
        return this;
    }
/**
     * 连接请求接受线程
     */
    @Override
    public void run(){
        while(true){
            try{
                SocketChannel socketChannel = channel.accept();
                socketChannel.configureBlocking(false);
                socketChannel.register(selector,SelectionKey.OP_READ,true);
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

监听到连接后,构造对应的SocketChannel,这个时候SocketChannel需要设置为非阻塞,同时连接已经进来了,想Selector中注册为可读状态。等待任务包装线程包装后丢入到任务队列,等待处理。

3.2 任务包装线程

任务包装线程,从Selector中读取注册过来的SelectionKey,将这些key进行包装处理,包装后将任务丢入到队列中等待处理。具体的逻辑如下:

public AcceptHandler processor(){
        pool.submit(new PollorEvent());
        return this;
    }
    public class PollorEvent implements Runnable{
        @Override
        public void run() {
            while(true){
                try{
                    int keyCount = selector.selectNow();
                    Iterator<SelectionKey> iterator =
                            keyCount > 0 ? selector.selectedKeys().iterator() : null;

                    while (iterator != null && iterator.hasNext()) {
                        SelectionKey sk = iterator.next();

                        boolean flag = (boolean)sk.attachment();
                        if(flag){
                            task.offer(new SocketProcessor(sk));
                            sk.attach(false);
                        }
                        iterator.remove();
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }

这里有一个flag的标记,由于一个SelectionKey在注册以后会丢入到线程池中等待被处理,在未被处理前,也就是未被释放前,selector.selectNow()每次都会获取到,那么如果没有标记,每次都会被获取到,同时每次都会被包装一下丢入队列,等待处理。因此,这个给每一个监听到的SelectionKey都标记为是否已经丢入到队列的标记。通过:

sk.attach(false)

在每次获取到这个SelectionKey时,获取设置的值,如果为false,标识无需处理。这个值的初始化,在接收到连接时,向Selector中注册时就已经绑定了。

每一个SelectionKey被包装为SocketProcessor对象后丢入到队列中,这里的队列设置如下:

private LinkedBlockingQueue<SocketProcessor> task = new LinkedBlockingQueue<>();

3.3 任务处理线程

任务处理线程主要是初始化一定量的线程后,所有的线程等待从队列中获取任务,获取到任务后,读取请求信息后,进行包装请求Request和Response.同时判断当前请求的类型,来进行分发处理。基础的代码如下:

public AcceptHandler executor(){
        for(int i=0;i<10;i++){
            pool.submit(new ProcessorExecutorTask());
        }
        return this;
    }
   public class ProcessorExecutorTask implements Runnable{
        @Override
        public void run() {
            while(true){
                try{
                    //从队列中读取一个任务开始进行处理
                    SocketProcessor processor = task.take();
                    //开始处理数据
                    processor.run();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }
public class SocketProcessor {

        private static final String API_DATA_START = "/api";
        private static final String STATIC_RESOURCE_URI = "static";

        private Map<String,String> relation = new HashMap<>();

        private SelectionKey sk;

        public SocketProcessor(SelectionKey sk){
            this.sk = sk;

            relation.put("/api/test","com.simple.nio.tomcat.TestServlet");
        }

        public void run() throws IOException{
            boolean closeSocket = false;
            if (sk.isReadable()) {
                //读取数据
                Request request = new Request(sk);
                request.prepareParseRequest();
                //写入数据
                Response response = new Response(sk,request);
                processorRequest(request,response);

                closeSocket = true;
            }
            if (!closeSocket && sk.isWritable()) {
                closeSocket = true;
            }
            if (closeSocket) {
                if (sk.isValid()) {
                    sk.cancel();
                }
                if(sk.channel().isOpen()){
                    sk.channel().close();
                }
            }
        }

        /**
         * 处理请求
         * @param request
         * @param response
         */
        public void processorRequest(Request request,Response response){
            try {
                //针对不同的请求字段,进行分开处理,如果是静态资源的数据,则使用StaticResourceProcessor,Api接口请求,则使用ApiResouceProcessor
                if(request.getRequestPath().startsWith(API_DATA_START)){
                    new ApiResouceProcessor().proccess(request,response);
                }else{
                    new StaticResourceProcessor().proccess(request,response);
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }

        /**
         * 静态资源的处理
         */
        public class StaticResourceProcessor {
            /**
             * 开始处理请求
             * @param request
             * @param response
             * @throws IOException
             */
            public void proccess(Request request, Response response) throws Exception {
                //就是上面的那个字符串截取方法
                String uri = request.getRequestPath();
                //获取uri对应的静态资源
                URL ROOT = this.getClass().getClassLoader().getResource(STATIC_RESOURCE_URI);
                File staticResource = new File(ROOT.getPath() + uri);
                byte[] bytes = null;
                if(staticResource.exists() ||staticResource.isFile()){
                    bytes = IOUtils.toByteArray(new FileInputStream(staticResource));
                }else{
                    staticResource = new File(ROOT.getPath() + "/404.html");
                    bytes = IOUtils.toByteArray(new FileInputStream(staticResource));
                }
                response.writer(bytes);
            }
        }
        /**
         * 接口资源的处理
         */
        public class ApiResouceProcessor{
            /**
             * 开始处理请求
             * @param request
             * @param response
             * @throws IOException
             */
            public void proccess(Request request, Response response) throws Exception {
                //就是上面的那个字符串截取方法
                String uri = request.getRequestPath();
                //找到对应的路径地址对应的servlt
                String clazz = relation.getOrDefault(uri,null);
                if (clazz == null || clazz.length() <= 0 ) {
                    throw new RuntimeException("no mapping relation of servlet");
                }
                //实例化该servlet对象
                Class<?> servletClass = this.getClass().getClassLoader().loadClass(clazz);
                //实例化对象
                Servlet servlet = (Servlet)servletClass.newInstance();
                //调用servlet的service方法
                servlet.service(request,response);
            }
        }
    }

ProcessorExecutorTask中很简单,只是从队列中获取任务等待处理,这里是take()方法是阻塞的,知道有任务可以获取到时返回。

SocketProcessor中主要是处理任务了,从SelectionKey中读取当前状态,如果是可读的,那么开始构造Request对象和Response对象。然后开始处理。

通过判断当前SelectionKey对象的状态来判断进行何种处理:

1 如果是OP_WRITE,则标记当前已经处理完了,关闭连接。
2 如果是OP_READ,则构造对象开始处理任务,同时判断是静态资源文件还是api接口

针对请求的处理:

1 如果是静态资源文件,则读取对应的文件处理后,包装为字节数组,写入到管道中国
2 如果是api请求,则找到当前地址对应的Servlet,反射实例化对应的Servlet,然后执行service方法。

这里针对api接口的处理逻辑中,可以学习springboot类似的逻辑,通过注解的形式,将这种映射关系先实例化好,请求过来直接进行处理。具体的后面学习springboot源码是在看。

3.4 请求对象和响应对象的包装

Request对象的包装代码如下:

public class Request implements ServletRequest {
    /**
     * 询Key
     */
    private SelectionKey sk;
    /**
     * 解析用户请求后的URI
     */
    private String requestStr;

    private String requestMethod;
    private String requestPath;
    private Map<String,String> header = new HashMap<>();

    public Request(SelectionKey sk) {
        this.sk = sk;
    }

    /**
     * 解析获取到的数据,丰富当前的request数据
     */
    public void prepareParseRequest() throws IOException {
        String header = readMessage(sk,1024);
        this.requestStr = header;

        List<String> list = Arrays.asList(requestStr.split("\r\n"));
//        //解析请求方式,请求地址,http协议
        parseMethodAndPathFromRequestStr(list.get(0));
//        //解析请求头
        parserRequestHeaderFromRequestStr(list);
    }

    private String readMessage(SelectionKey sk,int len) throws IOException{
        //重置缓冲区,等待写入
        ByteBuffer buffer = ByteBuffer.allocate(len);
        //服务端想客户端写入数据
        SocketChannel channel = (SocketChannel)sk.channel();
        int num = channel.read(buffer);
        return num > 0 ? new String(buffer.array(), 0, num):"";
    }

    /**
     * 解析请求方式
     * 解析请求地址
     * 解析请求协议
     * @param path
     */
    private void parseMethodAndPathFromRequestStr(String path) throws IOException{
        //针对请求头进行处理
        List<String> strs = Arrays.asList(path.split(" "));
        //切分好以后,进行参数结果判断
        if (null == strs || strs.size() != 3) {
            throw new IOException("error request");
        }
        //开始进行相应数据的赋值
        this.requestMethod = strs.get(0);
        this.requestPath = strs.get(1);
        //开始解析路径参数
        //解析参数
    }
    /**
     * 解析请求头
     * @param list
     */
    private void parserRequestHeaderFromRequestStr(List<String> list) throws IOException{
        if(list.size() <= 2){
            return;
        }
        List<String> array = list.subList(1,list.size() - 1);
        //循环遍历,将字符串进行切割后
        for(String item : array){
            String[] strs = item.split(": ");
            if (strs == null || strs.length != 2) {
                throw new IOException("error request");
            }
            header.put(strs[0],strs[1]);
        }
    }
    xxxxxxxxxxx
}

Request主要是当请求进来以后,从管道中读取对应的信息,并解析请求地址,请求头数据等字段,这里只是用到了请求地址数据,其他的未做处理。

Response的包装代码如下:

public class Response implements ServletResponse {

    private Request request;
    private SelectionKey sk;

    public Response(SelectionKey sk, Request request){
        this.sk = sk;
        this.request = request;
    }

    public void println(String message){
        writer(200,"OK",message.getBytes());
    }

    public void writer(byte[] bytes){
        writer(200,"OK",bytes);
    }

    public void writer(int code,String message,byte[] bytes){
        String head = "HTTP/1.1 " + code + " " + message + "\r\n\r\n";

        byte[] headBytes = head.getBytes();
        byte[] result = new byte[headBytes.length + bytes.length];

        System.arraycopy(headBytes,0, result,0, headBytes.length);
        System.arraycopy(bytes,0, result, headBytes.length, bytes.length);

        //重置缓冲区,等待写入
        ByteBuffer buffer = ByteBuffer.allocate(result.length);
        buffer.put(result);
        buffer.flip();
        //服务端想客户端写入数据
        SocketChannel channel = (SocketChannel)sk.channel();
        try {
            channel.write(buffer);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    xxxxxxxxx
 }

这里的代码其实很少,主要是向管道中写入内容。其他的没有什么。

对应的Servlet中向Response中写入内容时,调用的是:

((Response)response).println("Hello Servlet!");

最后是测试结果:
源码学习-tomcat的简单实现(二)
源码学习-tomcat的简单实现(二)

相关文章:

  • 2021-12-17
  • 2021-07-06
  • 2021-12-09
  • 2022-12-23
  • 2021-11-09
  • 2021-12-04
  • 2021-12-20
  • 2022-12-23
猜你喜欢
  • 2021-11-02
  • 2021-06-29
  • 2021-04-21
  • 2022-12-23
  • 2021-06-09
  • 2021-10-10
  • 2021-11-28
相关资源
相似解决方案