(一)先将Server运行起来
public static void serverStart() throws IOException {
//channel bytebuffer selector 三兄弟
ServerSocketChannel channel = ServerSocketChannel.open();
channel.socket().bind(new InetSocketAddress(8087));
channel.configureBlocking(false);
Selector selector = Selector.open();
channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int select = selector.select();
if (select == 0) {
continue;
}
System.out.println(111);
Iterator<SelectionKey> iterators = selector.selectedKeys().iterator();
while (iterators.hasNext()) {
SelectionKey selectionKey = iterators.next();
//accept,read,write
if (selectionKey.isAcceptable()) {
SocketChannel socket = ((ServerSocketChannel) selectionKey.channel()).accept();
socket.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
SocketChannel socket = ((ServerSocketChannel) selectionKey.channel()).accept();
socket.register(selector, SelectionKey.OP_WRITE);
} else {
//deal message
}
iterators.remove();
}
}
}
(二)分析下内部结构
(1) ServerSocketChannel channel = ServerSocketChannel.open();
获取ServerSocketChannel
//ServerSocketChannel.class
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}
//SelectorProviderImpl.class
public ServerSocketChannel openServerSocketChannel() throws IOException {
return new ServerSocketChannelImpl(this);
}
//ServerSocketChannelImpl.class
ServerSocketChannelImpl(SelectorProvider var1) throws IOException {
super(var1);
//打开一个socket,并返回一个文件描述符
this.fd = Net.serverSocket(true);
//Fd 对应得value
this.fdVal = IOUtil.fdVal(this.fd);
this.state = 0;
}
(2) Selector selector = Selector.open();
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
// SelectorProvider.class 显示创建一个selectorprovider
public static SelectorProvider provider() {
//单例模式 确保程序中只有一个 selectorProvider对象
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
//根据操作系统来 创建所需要得provider ,windows/linux
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
// WindowsSelectorProvider.class
public AbstractSelector openSelector() throws IOException {
return new WindowsSelectorImpl(this);
}
//WindowsSelectorImpl.class
WindowsSelectorImpl(SelectorProvider var1) throws IOException {
super(var1);
//获取 source端得FD
this.wakeupSourceFd = ((SelChImpl)this.wakeupPipe.source()).getFDVal();
SinkChannelImpl var2 = (SinkChannelImpl)this.wakeupPipe.sink();
var2.sc.socket().setTcpNoDelay(true);
//获取 Sink端得FD
this.wakeupSinkFd = var2.getFDVal();
//将注册到selector中得channel FD放进pollWrapper,selector 进行轮询,查询可以用得channel
this.pollWrapper.addWakeupSocket(this.wakeupSourceFd, 0);
}
//WindowsSelectorImpl
主要创建了一个Pipe管道,其中sourceChannel 是数据得写入通道,SinkChannel是文件得读取通道,分别位于Pipe的两端;64位得情况下高32是read Pipe,低32是write Pipe
private final Pipe wakeupPipe = Pipe.open();
class PipeImpl extends Pipe {
private static final int NUM_SECRET_BYTES = 16;
private static final Random RANDOM_NUMBER_GENERATOR = new SecureRandom();
private SourceChannel source;
private SinkChannel sink;
PipeImpl(SelectorProvider var1) throws IOException {
try {
AccessController.doPrivileged(new PipeImpl.Initializer(var1));
} catch (PrivilegedActionException var3) {
throw (IOException)var3.getCause();
}
}
}
(3) channel.register(selector, SelectionKey.OP_ACCEPT);
将前面创建的ServerSocketChannel 注册到selector 中,
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException
{
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (blocking)
throw new IllegalBlockingModeException();
//查找在selector (private SelectionKey[] keys = null;)中是否已经有个这个channel
SelectionKey k = findKey(sel);
if (k != null) {
//如果有了 改变记录中中channel 感兴趣的事件
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
//新创建一个SelectionKey
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {
if (!(var1 instanceof SelChImpl)) {
throw new IllegalSelectorException();
} else {
//新创建
SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);
var4.attach(var3);
Set var5 = this.publicKeys;
synchronized(this.publicKeys) {
this.implRegister(var4);
}
var4.interestOps(var2);
return var4;
}
}
//其中
SelectionKeyImpl(SelChImpl var1, SelectorImpl var2) {
this.channel = var1;
this.selector = var2;
}
//将新生成得selectionkey 记录下来
protected void implRegister(SelectionKeyImpl var1) {
Object var2 = this.closeLock;
synchronized(this.closeLock) {
if (this.pollWrapper == null) {
throw new ClosedSelectorException();
} else {
//pollwrapper 扩张
this.growIfNeeded();
this.channelArray[this.totalChannels] = var1;
var1.setIndex(this.totalChannels);
//记录selectionKey
this.fdMap.put(var1);
this.keys.add(var1);
//新channel fd 记录下来
this.pollWrapper.addEntry(this.totalChannels, var1);
++this.totalChannels;
}
}
}
(4)开始轮询 int select = selector.select();
获得的结果 select > 0 ,标明有事件需要处理,那么久开始在本地缓存中 处理感兴趣得事件
private int lockAndDoSelect(long var1) throws IOException {
synchronized(this) {
if (!this.isOpen()) {
throw new ClosedSelectorException();
} else {
Set var4 = this.publicKeys;
int var10000;
synchronized(this.publicKeys) {
Set var5 = this.publicSelectedKeys;
synchronized(this.publicSelectedKeys) {
//开始select
var10000 = this.doSelect(var1);
}
}
return var10000;
}
}
}
protected int doSelect(long var1) throws IOException {
if (this.channelArray == null) {
throw new ClosedSelectorException();
} else {
this.timeout = var1;
this.processDeregisterQueue();
if (this.interruptTriggered) {
this.resetWakeupSocket();
return 0;
} else {
this.adjustThreadsCount();
this.finishLock.reset();
this.startLock.startThreads();
try {
this.begin();
try {
//主要方法 在这里 他是一个内部内
this.subSelector.poll();
} catch (IOException var7) {
this.finishLock.setException(var7);
}
if (this.threads.size() > 0) {
this.finishLock.waitForHelperThreads();
}
} finally {
this.end();
}
this.finishLock.checkForException();
this.processDeregisterQueue();
//内部内 subSelector poll之后会获得数组信息,read[] write[],更新改变本地缓存得SelectionKey信息
int var3 = this.updateSelectedKeys();
this.resetWakeupSocket();
return var3;
}
}
}
//内部类 SubSelector
private final class SubSelector {
//index
private final int pollArrayIndex;
//select 读取之后 可读得FD
private final int[] readFds;
//select 读取之后 可写得FD
private final int[] writeFds;
private final int[] exceptFds;
private SubSelector() {
this.readFds = new int[1025];
this.writeFds = new int[1025];
this.exceptFds = new int[1025];
this.pollArrayIndex = 0;
}
private SubSelector(int var2) {
this.readFds = new int[1025];
this.writeFds = new int[1025];
this.exceptFds = new int[1025];
this.pollArrayIndex = (var2 + 1) * 1024;
}
// 开始去 poll 这个方法也是阻塞得,设置timeout之后,在规定时间内pollArray对应得FD 里面没有改变,超过timeout 就会返回
private int poll() throws IOException {
return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress, Math.min(WindowsSelectorImpl.this.totalChannels, 1024), this.readFds, this.writeFds, this.exceptFds, WindowsSelectorImpl.this.timeout);
}
(三)自我总结下运行过程
- 开启一个ServerSocketChannel,本地保存一个FD 并绑定port;
- 启动一个Selector,开通一个通道pipe,写source,读sink 两端,初始化pollWrapper,保存注册到selector中得channel 对应得socket FD
- 将SocketChannel 生成selectorKey注册到selector中,并保存下来
- 开始轮询 while(true),调用子类subSelector.poll() 获得底层端口监听得数据,如果有更新,就返回到程序中,并更新缓存中得数据, 对publicSelectionKey进行遍历,处理感兴趣得事件
![avatar](