选择器

最后,我们探索一下选择器。由于选择器内容比较多,所以本篇先偏理论地讲一下,后一篇讲代码,文章也没有什么概括、总结的,写到哪儿算哪儿了,只求能将选择器写明白,并且将一些相对重要的内容加粗标红。

选择器提供选择执行已经就绪的任务的能力,这使得多元I/O成为了可能,就绪执行和多元选择使得单线程能够有效地同时管理多个I/O通道。

某种程度上来说,理解选择器比理解缓冲区和通道类更困难一些和复杂一些,因为涉及了三个主要的类,它们都会同时参与到这整个过程中,这里先将选择器的执行分解为几条细节:

1、创建一个或者多个可选择的通道(SelectableChannel)

2、将这些创建的通道注册到选择器对象中

3、选择键会记住开发者关心的通道,它们也会追踪对应的通道是否已经就绪

4、开发者调用一个选择器对象的select()方法,当方法从阻塞状态返回时,选择键会被更新

5、获取选择键的集合,找到当时已经就绪的通道,通过遍历这些键,开发者可以选择对已就绪的通道要做的操作

对于选择器的操作,大致就是这么几步,OK,接下去再进一步,看一下和选择器相关的三个类。

 

选择器、可选择通道和选择键类

选择器(Selector)

选择器类管理着一个被注册的通道集合的信息和它们的就绪状态。通道是和选择器一起被注册的,并且使用选择器来更新通道的就绪状态。

可选择通道(SelectableChannel)

这个抽象类提供了实现通道的可选择性所需要的公共方法,它是所有支持就绪检查的通道类的父类,FileChannel对象不是可选择的,因为它们没有继承SelectableChannel,所有Socket通道都是可选择的,包括从管道(Pipe)对象中获得的通道。SelectableChannel可以被注册到Selector对象上,同时可以设定对哪个选择器而言哪种操作是感兴趣的。一个通道可以被注册到多个选择器上,但对每个选择器而言只能被注册一次。

选择键(SelectionKey)

选择键封装了特定的通道与特定的选择器的注册关系。调用SelectableChannel.register()方法会返回选择键并提供一个表示这种注册关系的标记。选择键包含了两个比特集(以整数形式进行编码),指示了该注册关系所关心的通道操作,以及通道已经准备好的操作。

用一张UML图来描述一下选择器、可选择通道和选择键:

Netty源码阅读(五):选择器

 

建立选择器

前面讲了,选择器的作用是管理了被注册的通道集合和它们的就绪状态,假设我们有三个Socket通道的选择器,可能会有类似的代码:

...
Selector selector = Selector.open(); 
channel1.register(selector, SelectionKey.OP_READ); 
channel2.register(selector, SelectionKey.OP_WRITE); 
channel3.register(selector, SelectionKey.OP_READ | OP_WRITE); 
channel4.register(selector, SelectionKey.OP_READ | OP_ACCEPT); 
ready = selector.select(10000); 
...

这种操作用图表示就是:

Netty源码阅读(五):选择器

代码创建了一个新的选择器,然后将这四个(已经存在的)Socket通道注册到选择器上,而且感兴趣的操作各不相同。select()方法在将线程置于睡眠状态直到这些感兴趣的事件中的一个发生或者10秒钟过去,这就是所谓的事件驱动

再稍微看一下Selector的API细节:

public abstract class Selector
{
    ...
    public static Selector open() throws IOException;
    public abstract boolean isOpen();
    public abstract void close() throws IOException;
    public abstract SelectionProvider provider();
    ...
}

Selector是通过调用静态工厂方法open()来实例化的,这个从前面的代码里面也看到了,选择器不是像通道或流那样的基本I/O对象----数据从来没有通过他们进行传递

通道是调用register方法注册到选择器上的,从代码里面可以看到register()方法接受一个Selector对象作为参数,以及一个名为ops的整数型参数,第二个参数表示关心的通道操作。在JDK1.4中,有四种被定义的可选择操作:读(read)、写(write)、连接(connect)和接受(accept)。

注意并非所有的操作都在所有的可选择通道上被支持,例如SocketChannel就不支持accept。

 

使用选择键

接下来看看选择键,选择键的API大致如下:

public abstract class SelectionKey
{
    public static final int OP_READ;
    public static final int OP_WRITE;
    public static final int OP_CONNECT;
    public static final int OP_ACCEPT;
    public abstract SelectableChannel channel();
    public abstract Selector selector();
    public abstract void cancel();
    public abstract boolean isValid();
    public abstract int interestOps();
    public abstract void iterestOps(int ops);
    public abstract int readyOps();
    public final boolean isReadable();
    public final boolean isWritable();
    public final boolean isConnectable();
    public final boolean isAcceptable();
    public final Object attach(Object ob);
    public final Object attachment();
}

关于这些API,总结几点:

1、就像前面提到的,一个键表示了一个特定的通道对象和一个特定的选择器对象之间的注册关系,channel()方法和selector()方法反映了这种关系

2、开发者可以使用cancel()方法终结这种关系,可以使用isValid()方法来检查这种有效的关系是否仍然存在,可以使用readyOps()方法来获取相关的通道已经就绪的操作

3、第2点有提到readyOps()方法,不过我们往往不需要使用这个方法,SelectionKey类定义了四个便于使用的布尔方法来为开发者测试通道的就绪状态,例如:

if (key.isWritable()){...}

这种写法就等价于:

if ((key.readyOps() & SelectionKeys.OPWRITE) != 0){...}

isWritable()、isReadable()、isConnectable()、isAcceptable()四个方法在任意一个SelectionKey对象上都能安全地调用。

4、当通道关闭时,所有相关的键会自动取消(一个通道可以被注册到多个选择器上);当选择器关闭时,所有被注册到该选择器的通道都会被注销并且相关的键立即被取消。

 

Selector维护的三种键

选择器维护者注册过的通道的集合,并且这些注册关系中的任意一个都是封装在SelectionKey对象中的。每一个Selector对象维护三种键的集合:

public abstract class Selector
{
    ...
    public abstract Set keys();
    public abstract Set selectedKeys();
    public abstract int select() throws IOException;
    public abstract int select(long timeout) throws IOException;
    public abstract int selectNow() throws IOException;
    public abstract void wakeup();
    ...   
}

由这个API看下去,这三种键是:

已注册的键的集合(Registered key set)

与选择器关联的已经注册的键的集合,并不是所有注册过的键都有效,这个集合通过keys()方法返回,并且可能是空的。这些键的集合是不可以直接修改的,试图这么做将引发java.lang.UnsupportedOperationException。

已选择的键的集合(Selected key set)

已注册的键的集合的子集,这个集合的每个成员都是相关的通道被选择器判断为已经准备好的并且包含于键的interest集合中的操作。这个集合通过selectedKeys()方法返回(有可能是空的)。

键可以直接从这个集合中移除,但不能添加。试图向已选择的键的集合中添加元素将抛出java.lang.UnsupportedOperationException。

已取消的键的集合(Cancelled key set)

已注册的键的集合的子集,这个集合包含了cancel()方法被调用过的键(这个键已经被无效化),但它们还没有被注销。这个集合是选择器对象的私有成员,因而无法直接访问。

 

选择过程

接着就是Selector的核心选择过程了。基本上来说,选择器是对select()、poll()、epoll()等本地调用或者类似的操作系统特定的系统调用的一个包装。但是Selector所做的不仅仅是简单地向本地代码传送参数,每个操作都有特定的过程,对这个过程的理解是合理地管理键和它们所表示的状态信息的基础。

选择操作是当三种形式的select()中的任意一种被调用时,由选择器执行的。不管是哪一种形式的调用,下面步骤将被执行:

1、已取消的键的集合将会被检查。如果它是非空的,每个已取消的键的集合中的键将从另外两个集合中移除,并且相关的通道将被注销。此步骤结束,已取消的键的集合将是空的。

2、已注册的键的集合中的键的interest集合将被检查,此步骤结束,对interest集合的改动不会影响剩余的检查过程。一旦就绪条件被定下来,底层操作系统将会进行查询,以确定每个通道所关心的操作的真实就绪状态,依赖于特定的select()方法调用,如果没有通道已经准备好,线程可能会在这时阻塞,通常会有一个超时值。

3、步骤2可能会花费很长时间,特别是线程处于阻塞状态时。与该选择器相关的键可能会同时被取消,当步骤2结束时,步骤1将重新执行,以完成任意一个在选择进行的过程中,键已经被取消的通道的注册。

4、select操作的返回值不是已准备好的通道的总数,而是从上一个select()调用之后进入就绪状态的通道的数量。之前的调用中就绪的,并且在本次调用中仍然就绪的通道不会被计入,而那些在前一次调用中已经就绪但已经不再处于就绪状态的通道也不会被计入。

最后,上面的Selector中还有两个方法没有提到,这里说明一下它们的意思:

1、selectNow()

调用selectNow()方法执行就绪检查过程,但不阻塞,如果当前没有通道就绪,立刻返回0.

2、wakeup()

调用wakeup()方法将使得选择器上的第一个还没有返回的选择操作立即返回,如果当前没有正在进行中的选择,那么下一次对select()方法的一种形式的调用将立即返回,后续的选择操作将正常进行。

 

选择器服务器端代码

上一篇文章毫无条理地讲了很多和选择器相关的知识点,下面进入实战,看一下如何写和使用选择器实现服务端Socket数据接收的程序,这也是NIO中最核心、最精华的部分。

看一下代码:

 1 public class SelectorServer
 2 {
 3     private static int PORT = 1234;
 4     
 5     public static void main(String[] args) throws Exception
 6     {
 7         // 先确定端口号
 8         int port = PORT;
 9         if (args != null && args.length > 0)
10         {
11             port = Integer.parseInt(args[0]);
12         }
13         // 打开一个ServerSocketChannel
14         ServerSocketChannel ssc = ServerSocketChannel.open();
15         // 获取ServerSocketChannel绑定的Socket
16         ServerSocket ss = ssc.socket();
17         // 设置ServerSocket监听的端口
18         ss.bind(new InetSocketAddress(port));
19         // 设置ServerSocketChannel为非阻塞模式
20         ssc.configureBlocking(false);
21         // 打开一个选择器
22         Selector selector = Selector.open();
23         // 将ServerSocketChannel注册到选择器上去并监听accept事件
24         ssc.register(selector, SelectionKey.OP_ACCEPT);
25         while (true)
26         {
27             // 这里会发生阻塞,等待就绪的通道
28             int n = selector.select();
29             // 没有就绪的通道则什么也不做
30             if (n == 0)
31             {
32                 continue;
33             }
34             // 获取SelectionKeys上已经就绪的通道的集合
35             Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
36             // 遍历每一个Key
37             while (iterator.hasNext())
38             {
39                 SelectionKey sk = iterator.next();
40                 // 通道上是否有可接受的连接
41                 if (sk.isAcceptable())
42                 {
43                     ServerSocketChannel ssc1 = (ServerSocketChannel)sk.channel();
44                     SocketChannel sc = ssc1.accept();
45                     sc.configureBlocking(false);
46                     sc.register(selector, SelectionKey.OP_READ);
47                 }
48                 // 通道上是否有数据可读
49                 else if (sk.isReadable())
50                 {
51                     readDataFromSocket(sk);
52                 }
53                 iterator.remove();
54             }
55         }
56     }
57     
58     private static ByteBuffer bb = ByteBuffer.allocate(1024);
59     
60     // 从通道中读取数据
61     protected static void readDataFromSocket(SelectionKey sk) throws Exception
62     {
63         SocketChannel sc = (SocketChannel)sk.channel();
64         bb.clear();
65         while (sc.read(bb) > 0)
66         {
67             bb.flip();
68             while (bb.hasRemaining())
69             {
70                 System.out.print((char)bb.get());
71             }
72             System.out.println();
73             bb.clear();
74         }
75     }
76 }

代码中已经有了相关的注释,这里继续解释一下:

(1)第8行~第12行,确定要监听的端口号,这里是1234

(2)第13行~第20行,由于选择器管理的是通道(Channel),因此首先要有通道。这里是服务器的程序,因此获取ServerSocketChannel,同时获取它所对应的ServerSocket,设置服务端的Channel为非阻塞模式,并绑定之前确定的端口号1234

(3)第21行~第24行,打开一个选择器,并注册当前通道感兴趣的时间为accept时间,即监听来自客户端的Socket数据

(4)第25行~第28行,调用select()方法等待来自客户端的Socket数据。程序会阻塞在这儿不会往下走,直到客户端有Socket数据的到来为止,所以严格意义上来说,NIO并不是一种非阻塞IO,因为NIO会阻塞在Selector的select()方法上

(5)第29行~第33行,没有什么好说的,如果select()方法获取的数据是0的话,下面的代码都没必要走,当然这是有可能发生的

(6)第34行~第39行,获取到已经就绪的通道的迭代器进行迭代,泛型是选择键SelectionKey,前文讲过,选择键用于封装特定的通道

(7)第40行~第52行,这里是一个关键点、核心点,这里做了两件事情:

  a)满足isAcceptable()则表示该通道上有数据到来了,此时我们做的事情不是获取该通道->创建一个线程来读取该通道上的数据,这么做就和前面一直讲的阻塞IO没有区别了,也无法发挥出NIO的优势来。我们做的事情只是简单地将对应的SocketChannel注册到选择器上,通过传入OP_READ标记,告诉选择器我们关心新的Socket通道什么时候可以准备好读数据

  b)满足isReadable()则表示新注册的Socket通道已经可以读取数据了,此时调用readDataFromSocket方法读取SocketChannel中的数据,读取数据的方法前面通道的文章中已经详细讲过了,就不讲了

(8)第53行,将键移除,这一行很重要也是容易忘记的一步操作。加入不remove,将会导致45行中出现空指针异常,原因不难理解,可以自己思考一下。

 

选择器客户端代码

选择器客户端的代码,没什么要求,只要向服务器端发送数据就可以了。这里选用的是Java NIO4:Socket通道一文中,最后一部分开五个线程向服务端发送数据的程序:

 1 public class SelectorClient
 2 {
 3     private static final String STR = "Hello World!";
 4     private static final String REMOTE_IP = "127.0.0.1";
 5     private static final int THREAD_COUNT = 5;
 6     
 7     private static class NonBlockingSocketThread extends Thread
 8     {
 9         public void run()
10         {
11             try
12             {
13                 int port = 1234;
14                 SocketChannel sc = SocketChannel.open();
15                 sc.configureBlocking(false);
16                 sc.connect(new InetSocketAddress(REMOTE_IP, port));
17                 while (!sc.finishConnect())
18                 {
19                     System.out.println("同" + REMOTE_IP + "的连接正在建立,请稍等!");
20                     Thread.sleep(10);
21                 }
22                 System.out.println("连接已建立,待写入内容至指定ip+端口!时间为" + System.currentTimeMillis());
23                 String writeStr = STR + this.getName();
24                 ByteBuffer bb = ByteBuffer.allocate(writeStr.length());
25                 bb.put(writeStr.getBytes());
26                 bb.flip(); // 写缓冲区的数据之前一定要先反转(flip)
27                 sc.write(bb);
28                 bb.clear();
29                 sc.close();
30             } 
31             catch (IOException e)
32             {
33                 e.printStackTrace();
34             } 
35             catch (InterruptedException e)
36             {
37                 e.printStackTrace();
38             }
39         }
40     }
41     
42     public static void main(String[] args) throws Exception
43     {
44         NonBlockingSocketThread[] nbsts = new NonBlockingSocketThread[THREAD_COUNT];
45         for (int i = 0; i < THREAD_COUNT; i++)
46             nbsts[i] = new NonBlockingSocketThread();
47         for (int i = 0; i < THREAD_COUNT; i++)
48             nbsts[i].start();
49         // 一定要join保证线程代码先于sc.close()运行,否则会有AsynchronousCloseException
50         for (int i = 0; i < THREAD_COUNT; i++)
51             nbsts[i].join();
52     }
53 }

 

代码执行结果

先运行服务端程序:

Netty源码阅读(五):选择器

空白,很正常,因为在监听客户端数据的到来,此时并没有数据。接着运行客户端程序:

Netty源码阅读(五):选择器

看到5个线程的数据已经发送,此时服务端的执行情况是:

Netty源码阅读(五):选择器

数据全部接收到并打印,看到右边的方框还是红色的,说明这5个线程的数据接收、打印完毕之后,再继续等待着客户端的数据的到来。

总结一下Selector的执行两个关键点:

1、注册一个ServerSocketChannel到selector中,这个通道的作用只是为了监听客户端是否有数据到来(这里注意一下有数据到来,意思是假如需要接收100个字节,如果到来了1个字节就算数据到来了),只要有数据到来,就把特定通道注册到selector中,并指定其事件为读事件

2、ServerSocketChannel和SocketChannel(通道里面的是客户端的数据)共同存在在Selector中,只要有注册的事件到来,Selector取消阻塞状态,遍历SelectionKey集合,继续注册读取数据的通道,或者是从通道中读取数据。

 

选择过程的可扩展性

从上面的代码以及之前对于Selector的解读可以看到,Selector可以简化用单线程同时管理多个可选择通道的实现。使用一个线程来为多个通道提供服务,通过消除管理各个线程的额外开销,可能会降低复杂性并可能大幅提升性能。但只使用一个线程来服务所有可选择的通道是不是一个好主意呢?这要看情况。

对单核CPU的系统而言这可能是一个好主意,因为在任何情况下都只有一个线程能够运行。通过消除在线程之间进行上下文切换带来的额外开销,总吞吐量可以提高。但对于一个多核CPU的系统而言呢?字啊一个有n个CPU的系统上,当一个单一的线程线性轮流地处理每一个线程时,可能有(n-1)个CPU处于空闲状态。

一种可行的解决办法是使用多个选择器。但是请尽量不要这么做,在大量通道上执行就绪选择并不会有很大的开销,大多数工作是由底层操作系统完成的,管理多个选择器并随机地将通道分派给它们当中的一个并不是这个问题的合理的解决方案。

一种更好的解决方案是对所有的可选择通道使用同一个选择器,并将对就绪选择通道的服务委托给其他线程。开发者只使用一个线程监控通道的就绪状态,至于通道处于就绪状态之后又如何做,有两种可行的做法:

1、使用一个协调好的工作线程池来处理接收到的数据,当然线程池的大小是可以调整的

2、通道根据功能由不同的工作线程来处理,它们可能是日志线程、命令/控制线程、状态请求线程等

相关文章:

  • 2021-09-28
  • 2021-09-07
  • 2021-11-19
  • 2022-01-13
  • 2021-07-31
  • 2022-12-23
  • 2022-12-23
猜你喜欢
  • 2021-12-11
  • 2022-12-23
  • 2021-12-01
  • 2022-01-10
  • 2022-01-17
  • 2021-05-31
  • 2021-06-09
相关资源
相似解决方案