1 //创建一个AsynchronousSocketChannel实例,
2 //AsynchronousChannelGroup在linux中是EPollPort,在windows中是Iopc,
3 //AsynchronousChannelGroup内持有fileDescriptor到channel的映射,从epoll返回的事件可以间接的找到fileDescriptor,通过映射找到channel,从而完成io;
4 //AsynchronousChannelGroup还持有线程池,自动开启,用于处理io,执行CompletionHandler。
5 //线程池默认是一个new ThreadPool(Executors.newCachedThreadPool(threadFactory), isFixed, poolSize),
6 //并启动poolSize个线程,poolSize的大小为Runtime.getRuntime().availableProcessors();
7 //线程池由EPollPort(AsynchronousChannelGroupImpl的实现类)管理,会详细说EPollPort
8 public static AsynchronousSocketChannel open(AsynchronousChannelGroup group)
9 throws IOException
10 {
11 AsynchronousChannelProvider provider = (group == null) ?
12 AsynchronousChannelProvider.provider() : group.provider();
13 return provider.openAsynchronousSocketChannel(group);
14 }
![]()
1 public AsynchronousSocketChannel openAsynchronousSocketChannel(AsynchronousChannelGroup group)
2 throws IOException
3 {
4 return new UnixAsynchronousSocketChannelImpl(toPort(group));
5 }
6
7 private Port toPort(AsynchronousChannelGroup group) throws IOException {
8 if (group == null) {
9 return defaultEventPort();
10 } else {
11 if (!(group instanceof EPollPort))
12 throw new IllegalChannelGroupException();
13 return (Port)group;
14 }
15 }
16
17 private EPollPort defaultEventPort() throws IOException {
18 if (defaultPort == null) {
19 synchronized (LinuxAsynchronousChannelProvider.class) {
20 if (defaultPort == null) {
21 //默认会创建一个Executors.newCachedThreadPool(threadFactory),并启动poolSize个线程,poolSize在创建ThreadPool时会设置
22 defaultPort = new EPollPort(this, ThreadPool.getDefault()).start();
23 }
24 }
25 }
26 return defaultPort;
27 }
28
29 static ThreadPool createDefault() {
30 int poolSize = getDefaultThreadPoolInitialSize();
31 if (poolSize < 0) {
32 poolSize = Runtime.getRuntime().availableProcessors();//与核心数相同,双核四线程,availableProcessors返回4
33 }
34
35 ThreadFactory threadFactory = getDefaultThreadPoolThreadFactory();
36 if (threadFactory == null) {
37 threadFactory = defaultThreadFactory();
38 }
39
40 ExecutorService executorService = Executors.newCachedThreadPool(threadFactory);
41 boolean isFixed = false;
42 return new ThreadPool(executorService, isFixed, poolSize);
43 }
44
45 //Executors.newCachedThreadPool(threadFactory)中的threadFactory
46 static ThreadFactory defaultThreadFactory() {
47 return System.getSecurityManager() == null ? (var0) -> {//该ThreadFactory每次newThread都会返回一个Thread
48 Thread var1 = new Thread(var0);
49 var1.setDaemon(true);
50 return var1;
51 } : (var0) -> {
52 PrivilegedAction var1 = () -> {
53 InnocuousThread var1 = new InnocuousThread(var0);
54 var1.setDaemon(true);
55 return var1;
56 };
57 return (Thread)AccessController.doPrivileged(var1);
58 };
59 }
60
61 //close时会关闭线程池
62 void implClose() throws IOException {
63 // remove the mapping
64 port.unregister(fdVal);
65
66 // close file descriptor
67 nd.close(fd);
68
69 // All outstanding I/O operations are required to fail
70 finish(false, true, true);
71 }
72 final void unregister(int fd) {
73 boolean checkForShutdown = false;
74
75 fdToChannelLock.writeLock().lock();
76 try {
77 fdToChannel.remove(Integer.valueOf(fd));
78 // last key to be removed so check if group is shutdown
79 if (fdToChannel.isEmpty())
80 checkForShutdown = true;
81 } finally {
82 fdToChannelLock.writeLock().unlock();
83 }
84
85 // continue shutdown
86 if (checkForShutdown && isShutdown()) {
87 try {
88 //关闭线程池
89 shutdownNow();
90 } catch (IOException ignore) { }
91 }
92 }
View Code