继上一篇的 IO 多路复用之后,我们已经从操作系统的层面上,解决了 IO 不必要等待的问题

不用再写类似的代码了:

while (true) {
      if (socket.isReadable) {
           doRead(sockert);        
      }
      sleep();        
}

本质上是驱动程序通过中断通知我们,有数据来到了 IO 口,我们可以去接收了。

现在,需要在 select , poll 或者 epoll 之上进行抽象,套一层能和应用对接的代码。 

这一层代码需要 下接 IO 多路复用,上接业务。也就是将 IO 设备传来的 数据交给业务代码。

一般的模式有 Reactor 和 Proactor

 

Reactor(反应堆模式):

  本质做的事情是从 IO 多路复用器(select,poll, epoll)上 得知某些 IO 设备是否可读/写/accept,如果可以的话,就将相应的事件路由给对应的处理器(Handler)

  Handler 完成业务逻辑后返回 结果,结果将在 写事件发生后传输给 可写的 IO 设备。

简单代码表示:

package reactor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.HashMap;
import java.util.Set;

public class Reactor implements Runnable{

    private Selector selector;
    private ServerSocketChannel serverSocketChannel;

    public Reactor() throws IOException {
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        //TODO ? What is blocking?
        serverSocketChannel.bind(new InetSocketAddress("localhost", 5050), 1024);
        serverSocketChannel.configureBlocking(false);
        SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        key.attach(new Acceptor());
    }

    @Override
    public void run() {
        while (true) {
            try {
                // select 阻塞,直到有 IO 事件发生
                selector.select();
                Set<SelectionKey> keys = selector.selectedKeys();
                for (SelectionKey key : keys) {
                    dispatch(key);
                }
                // 清空, 不清空的话,之前的 selectionKey 会积累(相同的不会积累)
                // 某个IO设备感兴趣的事件 就算不发生,只要之前发生过,也会在 set 中
                // 加入这个 IO 设备注册的 selectionKey
                keys.clear();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    // 路由发生的事件到处理器
    public void dispatch (SelectionKey key) {
        if (key.attachment() instanceof Runnable) {
            ((Runnable) key.attachment()).run();
        }
    }

    // 接受器
    public class Acceptor implements Handler {

        @Override
        public void run() {
            for (SelectionKey key : selector.keys()) {
                if (key.isAcceptable()) {
                    SocketChannel socketChannel = null;
                    try {
                        socketChannel = serverSocketChannel.accept();
                        if (socketChannel == null) {
                            return;
                        }
                        socketChannel.configureBlocking(false);
                        SelectionKey key1 = socketChannel.register(selector, SelectionKey.OP_READ);
                        key1.attach(new BusinessHandler(socketChannel, key1));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    // 处理器
    public interface Handler extends Runnable{}

    public class BusinessHandler implements Runnable {

        private SocketChannel socketChannel;
        private SelectionKey key;
        private String result;

        public BusinessHandler(SocketChannel socketChannel, SelectionKey key) {
            this.socketChannel = socketChannel;
            this.key = key;
        }

        @Override
        public void run() {
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            try {
                if (key.isReadable()) {
                    socketChannel.read(byteBuffer);
                    // 业务逻辑处理
                    result = processBusinessLogic(byteBuffer);
                    // 必须使用 interestOps 改变在 select 上注册的监听事件
                    key.interestOps(SelectionKey.OP_WRITE);
                } else if (key.isWritable()) {
                    if (result != null) {
                        // 默认 limit 一开始是数组最大值,position 是 0,每写入一个字节 position + 1,limit 不变
                        byteBuffer.put(result.getBytes());
                        // 读取需要令 position = 0, limit = put写入数据的最大边界(之前的 position),flip 做的就是这件事
                        byteBuffer.flip();
                        socketChannel.write(byteBuffer);
                        // 必须使用 interestOps 改变在 select 上注册的监听事件
                        // 否则因为 select 是 LT (水平触发)模式,只要缓冲区可写就一直触发可写事件
                        key.interestOps(SelectionKey.OP_READ);
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        // 处理业务逻辑
        public String processBusinessLogic(ByteBuffer byteBuffer) {
            // 读取需要令 position = 0 , limit = 写入数据的最大边界
            byteBuffer.flip();
            byte[] res = new byte[byteBuffer.limit() - byteBuffer.position()];
            byteBuffer.get(res);
            String request = new String(res);
            if ("What is the time".equals(request)) {
                return new Date().toString();
            }
            return "Unknown request";
        }
    }
}
View Code

相关文章: