继上一篇的 IO 多路复用之后,我们已经从操作系统的层面上,解决了 IO 不必要等待的问题
不用再写类似的代码了:
while (true) { if (socket.isReadable) { doRead(sockert); } sleep(); }
本质上是驱动程序通过中断通知我们,有数据来到了 IO 口,我们可以去接收了。
现在,需要在 select , poll 或者 epoll 之上进行抽象,套一层能和应用对接的代码。
这一层代码需要 下接 IO 多路复用,上接业务。也就是将 IO 设备传来的 数据交给业务代码。
一般的模式有 Reactor 和 Proactor
Reactor(反应堆模式):
本质做的事情是从 IO 多路复用器(select,poll, epoll)上 得知某些 IO 设备是否可读/写/accept,如果可以的话,就将相应的事件路由给对应的处理器(Handler)
Handler 完成业务逻辑后返回 结果,结果将在 写事件发生后传输给 可写的 IO 设备。
简单代码表示:
package reactor; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Date; import java.util.HashMap; import java.util.Set; public class Reactor implements Runnable{ private Selector selector; private ServerSocketChannel serverSocketChannel; public Reactor() throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); //TODO ? What is blocking? serverSocketChannel.bind(new InetSocketAddress("localhost", 5050), 1024); serverSocketChannel.configureBlocking(false); SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); key.attach(new Acceptor()); } @Override public void run() { while (true) { try { // select 阻塞,直到有 IO 事件发生 selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { dispatch(key); } // 清空, 不清空的话,之前的 selectionKey 会积累(相同的不会积累) // 某个IO设备感兴趣的事件 就算不发生,只要之前发生过,也会在 set 中 // 加入这个 IO 设备注册的 selectionKey keys.clear(); } catch (IOException e) { e.printStackTrace(); } } } // 路由发生的事件到处理器 public void dispatch (SelectionKey key) { if (key.attachment() instanceof Runnable) { ((Runnable) key.attachment()).run(); } } // 接受器 public class Acceptor implements Handler { @Override public void run() { for (SelectionKey key : selector.keys()) { if (key.isAcceptable()) { SocketChannel socketChannel = null; try { socketChannel = serverSocketChannel.accept(); if (socketChannel == null) { return; } socketChannel.configureBlocking(false); SelectionKey key1 = socketChannel.register(selector, SelectionKey.OP_READ); key1.attach(new BusinessHandler(socketChannel, key1)); } catch (IOException e) { e.printStackTrace(); } } } } } // 处理器 public interface Handler extends Runnable{} public class BusinessHandler implements Runnable { private SocketChannel socketChannel; private SelectionKey key; private String result; public BusinessHandler(SocketChannel socketChannel, SelectionKey key) { this.socketChannel = socketChannel; this.key = key; } @Override public void run() { ByteBuffer byteBuffer = ByteBuffer.allocate(1024); try { if (key.isReadable()) { socketChannel.read(byteBuffer); // 业务逻辑处理 result = processBusinessLogic(byteBuffer); // 必须使用 interestOps 改变在 select 上注册的监听事件 key.interestOps(SelectionKey.OP_WRITE); } else if (key.isWritable()) { if (result != null) { // 默认 limit 一开始是数组最大值,position 是 0,每写入一个字节 position + 1,limit 不变 byteBuffer.put(result.getBytes()); // 读取需要令 position = 0, limit = put写入数据的最大边界(之前的 position),flip 做的就是这件事 byteBuffer.flip(); socketChannel.write(byteBuffer); // 必须使用 interestOps 改变在 select 上注册的监听事件 // 否则因为 select 是 LT (水平触发)模式,只要缓冲区可写就一直触发可写事件 key.interestOps(SelectionKey.OP_READ); } } } catch (IOException e) { e.printStackTrace(); } } // 处理业务逻辑 public String processBusinessLogic(ByteBuffer byteBuffer) { // 读取需要令 position = 0 , limit = 写入数据的最大边界 byteBuffer.flip(); byte[] res = new byte[byteBuffer.limit() - byteBuffer.position()]; byteBuffer.get(res); String request = new String(res); if ("What is the time".equals(request)) { return new Date().toString(); } return "Unknown request"; } } }