(一)先将Server运行起来

 public static void serverStart() throws IOException {
        //channel bytebuffer selector 三兄弟
        ServerSocketChannel channel = ServerSocketChannel.open();
        channel.socket().bind(new InetSocketAddress(8087));
        channel.configureBlocking(false);
        Selector selector = Selector.open();
        channel.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            int select = selector.select();
            if (select == 0) {
                continue;
            }
            System.out.println(111);
            Iterator<SelectionKey> iterators = selector.selectedKeys().iterator();
            while (iterators.hasNext()) {
                SelectionKey selectionKey = iterators.next();
                //accept,read,write
                if (selectionKey.isAcceptable()) {
                    SocketChannel socket = ((ServerSocketChannel) selectionKey.channel()).accept();
                    socket.register(selector, SelectionKey.OP_READ);
                } else if (selectionKey.isReadable()) {
                    SocketChannel socket = ((ServerSocketChannel) selectionKey.channel()).accept();
                    socket.register(selector, SelectionKey.OP_WRITE);
                } else {
                    //deal message
                }
                iterators.remove();
            }
        }
    }

(二)分析下内部结构

(1) ServerSocketChannel channel = ServerSocketChannel.open();

获取ServerSocketChannel

    //ServerSocketChannel.class
    public static ServerSocketChannel open() throws IOException {
        return SelectorProvider.provider().openServerSocketChannel();
    }
     //SelectorProviderImpl.class
     public ServerSocketChannel openServerSocketChannel() throws IOException {
        return new ServerSocketChannelImpl(this);
    }
	//ServerSocketChannelImpl.class
    ServerSocketChannelImpl(SelectorProvider var1) throws IOException {
        super(var1);
        //打开一个socket,并返回一个文件描述符
        this.fd = Net.serverSocket(true);
        //Fd 对应得value
        this.fdVal = IOUtil.fdVal(this.fd);
        this.state = 0;
    }

(2) Selector selector = Selector.open();

   public static Selector open() throws IOException {
        return SelectorProvider.provider().openSelector();
    }
    // SelectorProvider.class 显示创建一个selectorprovider
    public static SelectorProvider provider() {
        //单例模式 确保程序中只有一个 selectorProvider对象
        synchronized (lock) {
            if (provider != null)
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction<SelectorProvider>() {
                    public SelectorProvider run() {
                            if (loadProviderFromProperty())
                                return provider;
                            if (loadProviderAsService())
                                return provider;
                            //根据操作系统来 创建所需要得provider ,windows/linux
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        }
                    });
        }
    }
    // WindowsSelectorProvider.class
	public AbstractSelector openSelector() throws IOException {
        return new WindowsSelectorImpl(this);
    }
    //WindowsSelectorImpl.class
    WindowsSelectorImpl(SelectorProvider var1) throws IOException {
        super(var1);
        //获取 source端得FD
        this.wakeupSourceFd = ((SelChImpl)this.wakeupPipe.source()).getFDVal();
        SinkChannelImpl var2 = (SinkChannelImpl)this.wakeupPipe.sink();
        var2.sc.socket().setTcpNoDelay(true);
        //获取 Sink端得FD
        this.wakeupSinkFd = var2.getFDVal();
        //将注册到selector中得channel FD放进pollWrapper,selector 进行轮询,查询可以用得channel
        this.pollWrapper.addWakeupSocket(this.wakeupSourceFd, 0);
    }
    
    //WindowsSelectorImpl
    主要创建了一个Pipe管道,其中sourceChannel 是数据得写入通道,SinkChannel是文件得读取通道,分别位于Pipe的两端;64位得情况下高32是read Pipe,32是write Pipe
    private final Pipe wakeupPipe = Pipe.open();
	class PipeImpl extends Pipe {
	    private static final int NUM_SECRET_BYTES = 16;
	    private static final Random RANDOM_NUMBER_GENERATOR = new SecureRandom();
	    private SourceChannel source;
	    private SinkChannel sink;

	    PipeImpl(SelectorProvider var1) throws IOException {
	        try {
	            AccessController.doPrivileged(new PipeImpl.Initializer(var1));
	        } catch (PrivilegedActionException var3) {
	            throw (IOException)var3.getCause();
	        }
	    }
	 }

(3) channel.register(selector, SelectionKey.OP_ACCEPT);

将前面创建的ServerSocketChannel 注册到selector 中,

   public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException
   {
        synchronized (regLock) {
            if (!isOpen())
                throw new ClosedChannelException();
            if ((ops & ~validOps()) != 0)
                throw new IllegalArgumentException();
            if (blocking)
                throw new IllegalBlockingModeException();
            //查找在selector (private SelectionKey[] keys = null;)中是否已经有个这个channel
            SelectionKey k = findKey(sel);
            if (k != null) {
                //如果有了  改变记录中中channel 感兴趣的事件
                k.interestOps(ops);
                k.attach(att);
            }
            if (k == null) {
                // New registration
                synchronized (keyLock) {
                    if (!isOpen())
                        throw new ClosedChannelException();
                     //新创建一个SelectionKey
                    k = ((AbstractSelector)sel).register(this, ops, att);
                    addKey(k);
                }
            }
            return k;
        }
    }
	protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {
	        if (!(var1 instanceof SelChImpl)) {
	            throw new IllegalSelectorException();
	        } else {
	            //新创建
	            SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);
	            var4.attach(var3);
	            Set var5 = this.publicKeys;
	            synchronized(this.publicKeys) {
	                this.implRegister(var4);
	            }
	
	            var4.interestOps(var2);
	            return var4;
	        }
    }
    //其中 
    SelectionKeyImpl(SelChImpl var1, SelectorImpl var2) {
        this.channel = var1;
        this.selector = var2;
    }
    //将新生成得selectionkey 记录下来
    protected void implRegister(SelectionKeyImpl var1) {
        Object var2 = this.closeLock;
        synchronized(this.closeLock) {
            if (this.pollWrapper == null) {
                throw new ClosedSelectorException();
            } else {
                //pollwrapper 扩张
                this.growIfNeeded();
                this.channelArray[this.totalChannels] = var1;
                var1.setIndex(this.totalChannels);
                //记录selectionKey
                this.fdMap.put(var1);
                this.keys.add(var1);
                //新channel fd 记录下来
                this.pollWrapper.addEntry(this.totalChannels, var1);
                ++this.totalChannels;
            }
        }
    }

(4)开始轮询 int select = selector.select();

获得的结果 select > 0 ,标明有事件需要处理,那么久开始在本地缓存中 处理感兴趣得事件

private int lockAndDoSelect(long var1) throws IOException {
        synchronized(this) {
            if (!this.isOpen()) {
                throw new ClosedSelectorException();
            } else {
                Set var4 = this.publicKeys;
                int var10000;
                synchronized(this.publicKeys) {
                    Set var5 = this.publicSelectedKeys;
                    synchronized(this.publicSelectedKeys) {
                        //开始select
                        var10000 = this.doSelect(var1);
                    }
                }

                return var10000;
            }
        }
    }
     protected int doSelect(long var1) throws IOException {
        if (this.channelArray == null) {
            throw new ClosedSelectorException();
        } else {
            this.timeout = var1;
            this.processDeregisterQueue();
            if (this.interruptTriggered) {
                this.resetWakeupSocket();
                return 0;
            } else {
                this.adjustThreadsCount();
                this.finishLock.reset();
                this.startLock.startThreads();

                try {
                    this.begin();

                    try {
                        //主要方法  在这里 他是一个内部内
                        this.subSelector.poll();
                    } catch (IOException var7) {
                        this.finishLock.setException(var7);
                    }

                    if (this.threads.size() > 0) {
                        this.finishLock.waitForHelperThreads();
                    }
                } finally {
                    this.end();
                }
                this.finishLock.checkForException();
                this.processDeregisterQueue();
                //内部内 subSelector poll之后会获得数组信息,read[] write[],更新改变本地缓存得SelectionKey信息
                int var3 = this.updateSelectedKeys();
                this.resetWakeupSocket();
                return var3;
            }
        }
    }
    //内部类 SubSelector 
    private final class SubSelector {
        //index
        private final int pollArrayIndex;
        //select 读取之后 可读得FD
        private final int[] readFds;
        //select 读取之后 可写得FD
        private final int[] writeFds;
        private final int[] exceptFds;

        private SubSelector() {
            this.readFds = new int[1025];
            this.writeFds = new int[1025];
            this.exceptFds = new int[1025];
            this.pollArrayIndex = 0;
        }

        private SubSelector(int var2) {
            this.readFds = new int[1025];
            this.writeFds = new int[1025];
            this.exceptFds = new int[1025];
            this.pollArrayIndex = (var2 + 1) * 1024;
        }

        // 开始去 poll 这个方法也是阻塞得,设置timeout之后,在规定时间内pollArray对应得FD 里面没有改变,超过timeout 就会返回 
        private int poll() throws IOException {
            return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress, Math.min(WindowsSelectorImpl.this.totalChannels, 1024), this.readFds, this.writeFds, this.exceptFds, WindowsSelectorImpl.this.timeout);
        }

(三)自我总结下运行过程

  1. 开启一个ServerSocketChannel,本地保存一个FD 并绑定port;
  2. 启动一个Selector,开通一个通道pipe,写source,读sink 两端,初始化pollWrapper,保存注册到selector中得channel 对应得socket FD
  3. 将SocketChannel 生成selectorKey注册到selector中,并保存下来
  4. 开始轮询 while(true),调用子类subSelector.poll() 获得底层端口监听得数据,如果有更新,就返回到程序中,并更新缓存中得数据, 对publicSelectionKey进行遍历,处理感兴趣得事件

![avatar](java网络编程:NIO

相关文章: