在本章巾,我们会分别对 JDK 的BIO ,NIO 和JDK 1.7 最新提供的 NI02.0的使用进行详细说明 ,通过流程图和代 码讲解,让大 家体会到随着 Ja va 1/0 类库的 不断发展和改进,基于 Java 的网络编程会变得越来越简单 ,随着异步 I/0 功 能的增强,基于Java NIO 开发 的网络服务器甚至不逊色于采用 C++开发的网络程序 。

本章主要 内容包括 :

1.传统的 同步阻塞式 1/0 编程

2.基于 NIO 的非阻塞编程

3.基于 NI02 .0 的异步非阻塞 ( AJO )编程

4.为什么要使用 NIO  编程

5.为什 么选择N etty


2.1    传统的 BIO 编程

 网络编程 的基本模型是 Client/Server 模型 ,也就是两个进程 之 间进行相互通信 ,其 中 服务端提供位 置信息 ( 绑定的 IP  地址和监听端口 ) ,客户端通过连接操作 向服务端监听的址发起连接请求 ,通过三次握手建立连接 ,如果连 接建立成功,双方就可以通过 网络套接字 ( Socket ) 进行通信 。

在基于传统同步阻塞模型开发中 ,

Server Socket 负责绑定 IP 地址 ,启动监昕端 口:

Socket 负责发起连接操作 。连接 成功之后 ,双方通过输入 和输 出流进行同步阻塞式通信 。

下雨,我们就 以经典的时间服务器 ( TimeServer ) 为例 ,通过代码 分析来回顾和 熟悉 下 BIO  编程 。

 

2.1 .1           BIO 通信模型图

 首先 ,我 们通过图 2- 1 所示 的通信模型图 来熟悉BIO 的服务端通信模型 :采用 BIO 通信模型的服务端 ,通 常 由一个独立 的 Acceptor 线程负责监听客户端的连 接 ,它接 收到客 户端连接请求之后为每个客户端创建 一个新 的线程进行处理 ,处理完成之后 ,通过 输 出流返 回应答给客户端 ,线程销毁 。这就是 典型的一请求 一应答通信模型

(基础篇 走进javaNIO)第二章-NIO入门

该模型最大的问题就是缺乏弹性伸缩 能力 ,当客户 端并发访问量增加后 ,服 务端的线 程个 数和客户端并发访 问数量. 1 : 1 的正 比关系 ,由于线程 是 Java 虚拟机非常宝贵的系统 资源 ,当线程数膨胀之后 ,系统的 性 能将急剧 下降,随着并发访问量 的继续增大 ,系统会 发生线程堆溢出、创建新线程 失败等问题 ,并最 终导致进程岩机或者僵死,不能对外提 供服 务 。

下面的两个小节 ,我们会 分别对服务端和客户端进行源码分析,寻找同 步阻塞I/0的弊端 。

 

2. 1.2     同步阻塞式 1/0 创建 的 Timeserver 源码分析

 代码 清单 2-1同步 阻塞 I/0 的 TimeServer

( 备注 :以 下代 码行号 均 对应 源 代码 中实 际行号 。)

 

 1 package com.phei.netty.bio;
 2 
 3 import java.io.IOException;
 4 import java.net.ServerSocket;
 5 import java.net.Socket;
 6 
 7 /**
 8  * @author lilinfeng
 9  * @date 2014年2月14日
10  * @version 1.0
11  */
12 public class TimeServer {
13 
14     /**
15      * @param args
16      * @throws IOException
17      */
18     public static void main(String[] args) throws IOException {
19     int port = 8080;
20     if (args != null && args.length > 0) {
21 
22         try {
23         port = Integer.valueOf(args[0]);
24         } catch (NumberFormatException e) {
25         // 采用默认值
26         }
27 
28     }
29     ServerSocket server = null;
30     try {
31         server = new ServerSocket(port);
32         System.out.println("The time server is start in port : " + port);
33         Socket socket = null;
34         while (true) {
35         socket = server.accept();
36         new Thread(new TimeServerHandler(socket)).start();
37         }
38     } finally {
39         if (server != null) {
40         System.out.println("The time server close");
41         server.close();
42         server = null;
43         }
44     }
45     }
46 }

 

 

TimeServer根据传入 的参数设置监 昕端 口,如果没有入 参 ,使用 默认值 8080 ,31行通过构造 函数创建 ServerSocket ,如果端 口合法且没有被占用 ,服 务端监 听成功 。34 ~37行 通 过 一 个 无 限循 环来 监 听客 户 端 的连接 ,如 果 没 有客 户 端 接 入 , 则主 线 程 阻塞 在 ServerSocket  的 accept 操作上 。启动 TimeServer ,通 过 J visuaJVM  打 印线程堆找 ,我们可 以发现主程序确实阻塞在 accept 操作 上 ,如 图 2-2 所示 。

 (基础篇 走进javaNIO)第二章-NIO入门

 

当有新 的客户端接入 的时候 ,执行代码 50 行 ,以 Socket 为参数构造 TimeServerHandler·对象 ,TimeServerHandler 是一个 Runnable,使用它为构造 函数的参数创建一个新的客户端 线程 处理这条 Socket 链路 。下面 我们继续分析 Time ServerHandler 的代码。

代码 清单 2-2    同 步 阻塞 I/0 的 Time ServerHandler

 1 package com.phei.netty.bio;
 2 
 3 import java.io.BufferedReader;
 4 import java.io.IOException;
 5 import java.io.InputStreamReader;
 6 import java.io.PrintWriter;
 7 import java.net.Socket;
 8 
 9 /**
10  * @author Administrator
11  * @date 2014年2月14日
12  * @version 1.0
13  */
14 public class TimeServerHandler implements Runnable {
15 
16     private Socket socket;
17 
18     public TimeServerHandler(Socket socket) {
19     this.socket = socket;
20     }
21 
22 
23     @Override
24     public void run() {
25     BufferedReader in = null;
26     PrintWriter out = null;
27     try {
28         in = new BufferedReader(new InputStreamReader(
29             this.socket.getInputStream()));
30         out = new PrintWriter(this.socket.getOutputStream(), true);
31         String currentTime = null;
32         String body = null;
33         while (true) {
34         body = in.readLine();
35         if (body == null)
36             break;
37         System.out.println("The time server receive order : " + body);
38         currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
39             System.currentTimeMillis()).toString() : "BAD ORDER";
40         out.println(currentTime);
41         }
42 
43     } catch (Exception e) {
44         if (in != null) {
45         try {
46             in.close();
47         } catch (IOException e1) {
48             e1.printStackTrace();
49         }
50         }
51         if (out != null) {
52         out.close();
53         out = null;
54         }
55         if (this.socket != null) {
56         try {
57             this.socket.close();
58         } catch (IOException e1) {
59             e1.printStackTrace();
60         }
61         this.socket = null;
62         }
63     }
64     }
65 }

 

27 行通过 BufferedReader 读取一行 ,如果 己经读到了输入流 的尾部 ,则返 回值为 null , 退 出循环 。如 果读 到 了非空值 ,则对 内容 进 行判 断 ,如 果请求消 息为 查询 时间的指令 ”QUERY TIME ORDER”则获取当前最新 的系统时间,通 过 PrintWriter  的 print ln  函数发送 给客户端 ,最后退 出循环 。代码 44~ 61行释放输入流 、输 出流 、和 Socket 套接字句柄 资 源 ,最后线程 自动销毁并被虚拟机 回收。

在下一个 小结 ,我们将介 绍同步 阻塞 I/0 的客户端代码 ,然后分 别运行服务端和客户 端 ,查看下程序 的运 行结果 。

 

2.1.3     同步阻塞式 1/0 创建 的 TimeClient 源码分析

客户端通过 Socket 创建 ,发送查询时 间服务器 的”QUERY TIME  ORDER 吁:旨令,然 后 读取服务端的 响应并将 结果打 印出来,随后关 闭连接 ,释放资源 ,程序退 出执行 。

代码 清单 2-3      同步 阻塞 I/0 的 TimeClient

 1 package com.phei.netty.bio;
 2 
 3 import java.io.BufferedReader;
 4 import java.io.IOException;
 5 import java.io.InputStreamReader;
 6 import java.io.PrintWriter;
 7 import java.net.Socket;
 8 
 9 /**
10  * @author lilinfeng
11  * @date 2014年2月14日
12  * @version 1.0
13  */
14 public class TimeClient {
15 
16     /**
17      * @param args
18      */
19     public static void main(String[] args) {
20         int port = 8080;
21         if (args != null && args.length > 0) {
22             try {
23                 port = Integer.valueOf(args[0]);
24             } catch (NumberFormatException e) {
25                 // 采用默认值
26             }
27         }
28         Socket socket = null;
29         BufferedReader in = null;
30         PrintWriter out = null;
31         try {
32             socket = new Socket("127.0.0.1", port);
33             in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
34             out = new PrintWriter(socket.getOutputStream(), true);
35             out.println("QUERY TIME ORDER");
36             System.out.println("Send order 2 server succeed.");
37             String resp = in.readLine();
38             System.out.println("Now is : " + resp);
39         } catch (Exception e) {
40             e.printStackTrace();
41         } finally {
42             if (out != null) {
43                 out.close();
44                 out = null;
45             }
46 
47             if (in != null) {
48                 try {
49                     in.close();
50                 } catch (IOException e) {
51                     e.printStackTrace();
52                 }
53                 in = null;
54             }
55 
56             if (socket != null) {
57                 try {
58                     socket.close();
59                 } catch (IOException e) {
60                     e.printStackTrace();
61                 }
62                 socket = null;
63             }
64         }
65     }
66 }

 

 

第 35 行客户端通过 PrintWriter    向服务端发送吨UERY TIME ORDER 指令,然后通过

BufferedReader 的 readLine 读取 响应并打 印。 分别执行服 务端和客户端 ,执 行结果如下 。 服 务端执行结果如 图 2-3 所示 。

(基础篇 走进javaNIO)第二章-NIO入门

客户端执行结果如 图 2-4 所示 。

(基础篇 走进javaNIO)第二章-NIO入门

服务器启动后,TimeServer运行到server.accept(),然后阻塞;当TimeClient的socket = new Socket("127.0.0.1", port);时候,TimeServer的server.accept()通了。运行TimeServerHandler线程

运行时候发现到了 in.readLine()时候阻塞了,等待对方out.println。

 

到此为止 ,同步 阻塞式 I/0 开发 的时间服务器程序 已经讲解完毕 ,我们发现 ,BIO 主 要 的问题在于每当有一个新 的客户端请求接入 时,服 务端必须创建 一个新的线程处理新接 入的客户端链路 ,一个线程只能处理一个客户端连接 。在 高性 能服务器应用领域 ,往往需 要丽 向成千上万个客户端的并发连 接 ,这种模型显然无 法满足高性能、高并发接 入的场景 。

为 了改进一线程一连接模型,后来又演进 出了一种通过线程池或者 消息队列实现 1个 或者多个线程处理 N 个客户端的模型,由于它的底层通信机制依然 使用 同步阻塞 I/0 ,所 以被称为 “ 伪异步,下面章节我们就对伪异步代码进行 分析 ,看看伪异步是否能够 满足 我们对高性能、高并发接入 的诉求 。


 2.2    伪异步 1/0 编程

 

为 了解 决同步 阻塞 I/0 面 临的一个链路需要一个线程处理的问题 ,后来有 人对它 的线 程模型进行 了优化 ,后端通过一个线程池 来处理多个客户端的请求接入 ,形成 客户端个 数 M:线程 池最大线程数 N 的比例关系 ,其中 M 可以远远大于 N,通过线程池可以灵活的 调 配线程资源 ,设置 线程的最大值 ,防止由于海量并发接 入导致线程耗尽 。

下面 ,我们结合连 接模 型图和源码 ,对伪异 步 1/0 进行分析 ,看它是否能 够解 决 同步 阻塞 I/0 面 临的 问题 。

2 .2 .1     伪异步 1/0 模型图

采用线程池和任务队列可以实现 一种 叫做伪异步 的 I/0 通信框 架 ,它 的模型图如 图 2-5         .

所示 。

 (基础篇 走进javaNIO)第二章-NIO入门

当有 新 的客 户 端 接入 的时候 ,将客 户端 的 Socket封装成 一个Task   ( 该任 务 实 现 java.Iang.Runnable 接口 〉 投边到后端的线程池中进行处 理 ,JDK  的线程池维护一个消息队列和 N个活跃线程对消息队列中的任务进行处理 。由于线程池可以设置 消息 队列 的大小和 最大线程数 ,因此 ,它 的资源占用是可控 的,无 论多少个客户端并发 访 问,都 不会导致资 源的耗 尽和省机 。

下面的小节 ,我们依然 采用 时间服务器程序 ,将 其改造成伪异步 1/0 时间服务器 ,然 后通过对代码进行分析 ,找 出其 弊端 。

 2.2.2     伪异步式 1/0 创建 的 Timeserver 源码分析

 1 package com.phei.netty.pio;
 2 
 3 import java.io.IOException;
 4 import java.net.ServerSocket;
 5 import java.net.Socket;
 6 
 7 import com.phei.netty.bio.TimeServerHandler;
 8 
 9 /**
10  * @author lilinfeng
11  * @date 2014年2月14日
12  * @version 1.0
13  */
14 public class TimeServer {
15 
16     /**
17      * @param args
18      * @throws IOException
19      */
20     public static void main(String[] args) throws IOException {
21     int port = 8080;
22     if (args != null && args.length > 0) {
23 
24         try {
25         port = Integer.valueOf(args[0]);
26         } catch (NumberFormatException e) {
27         // 采用默认值
28         }
29 
30     }
31     ServerSocket server = null;
32     try {
33         server = new ServerSocket(port);
34         System.out.println("The time server is start in port : " + port);
35         Socket socket = null;
36         TimeServerHandlerExecutePool singleExecutor = new TimeServerHandlerExecutePool(
37             50, 10000);// 创建IO任务线程池
38         while (true) {
39         socket = server.accept();
40         singleExecutor.execute(new TimeServerHandler(socket));
41         }
42     } finally {
43         if (server != null) {
44         System.out.println("The time server close");
45         server.close();
46         server = null;
47         }
48     }
49     }
50 }

伪异步 I/0 的主 函数代码发 生了变化 ,我们首先 创建一个时间服务器处理类 的线程池 , 当接收到新 的客户端连接 的时候 ,将请求 Socket 封装成一个 ask ,然 后调用线程池 的 execu te 方法执行 ,从而避免 了每个请求接入都创建一个新的线程

代码 清单 2-5   伪异步 IO 的 TimeServerHandlerExecutePool

 1 package com.phei.netty.pio;
 2 
 3 import java.util.concurrent.ArrayBlockingQueue;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.ThreadPoolExecutor;
 6 import java.util.concurrent.TimeUnit;
 7 
 8 /**
 9  * @author Administrator
10  * @date 2014年2月15日
11  * @version 1.0
12  */
13 public class TimeServerHandlerExecutePool {
14 
15     private ExecutorService executor;
16 
17     public TimeServerHandlerExecutePool(int maxPoolSize, int queueSize) {
18     executor = new ThreadPoolExecutor(Runtime.getRuntime()
19         .availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS,
20         new ArrayBlockingQueue<java.lang.Runnable>(queueSize));
21     }
22 
23     public void execute(java.lang.Runnable task) {
24     executor.execute(task);
25     }
26 }

由于线相池 和消息队列都是有界的 ,肉此 ,无论客户端并发连接数多大 ,它 都不会 导 致线程个数过于膨胀或者 内有溢 出,相 比于传统的一连接一线程模型 ,是一种改 良。

由于客户端代码 并没有改变 ,因此 ,我们直接运行 服务端和客户端 ,执行结果如 下。

(基础篇 走进javaNIO)第二章-NIO入门

图 2 -6      伪异步 IO时 间服务端运行结果

(基础篇 走进javaNIO)第二章-NIO入门

图 2-7       伪异步 I/0 时间客户端运行结果

 

伪异步 I/0 通信框架采用了线 程地实现 ,因此避 免 了为每个请求都创建一个独立线程 造成的线程资源耗尽问题 。但 是由于它底层 的通信依然采用同步阻塞模型 ,因此无法从根 本上解 决问题 。下个小节我们对伪异 步 1/0 进行深入分析 ,找 到它的弊端 ,然后看看 NIO 是如何从根本上解 决这个 问题的 。

 

2.2.3     伪异步 1/0 弊端分析

 要对伪异步 1/0 的弊端进行深入分析 ,首先我 们看两个 Java 同步 I/0 的 API 说明 ,随 后我们 结合代码进行详细分析 。

1.对输入流分析

(基础篇 走进javaNIO)第二章-NIO入门

(基础篇 走进javaNIO)第二章-NIO入门

请注意力日粗斜体字部 分的 API 说 明,当对 Socket 的输入流进行读取操作的 时候 ,它 会一直阻塞 下去 ,直 到发生如下三种事 件 。

1.有数据 可读

2.可用数据 已经读取完毕

3.发生空指针或者 I/0 异常

这意味着 当对方发送请求或者应答消息比较缓慢 、或者 网络传输较慢时,读 取输入流 一方 的通信线程将被长 时间阳.塞 ,如果对方要 60s 才 能够将数据 发送 完成 ,读取 一方 的 I/0 线程也将会被 同步阻塞 60s ,在此期间 ,其 他接入消息只能在消息队列中排 队

 

下面我们接着对输 出流进行分析 ,还是 看 JDK I/0 类库输 出流的 API 文档 ,然 后结合 文档说 明进行故障分析

2.对输出流分析

(基础篇 走进javaNIO)第二章-NIO入门

 

当调用 Output Stream  的 write 方法写输 出流的时候 ,它将会 被阻塞 ,直到所有 要发送 的字节 全部写入完毕 ,或者发生异常 。学习过 TCP/IP  相关知识的人 都知道 ,当消息的 接 收方处理缓慢 的时’候 ,将 不能及 时地从 TCP  缓冲区读取数据 ,这将会导致发送方 的 TCP window size  不断减小,直 到为 0 ,双 方处于 Keep-Al ive 状态 ,消息发送方将 不能再向 TCP 缓冲区 写入 消息 ,这 时如果采用的 是 同步阻塞 I/0 , write 操作将会被无限期阻 塞 ,直到 TCP window size 大于 0 或者发生 1/0 异常 。

通过对输入和输 出流 的 API 文档进行分析 ,我们 了解到读和写操作都是 同步阻塞 的, 阻塞的时间 取决于对方 I/0 线程 的处理速度和网络 I/0 的传输 速度 。本质上来讲 ,我们无 法保证生产环境 的网络状况和对端的应用程序 能足够快 ,如果我们的应用程 序依赖对 方 的 处理速度 ,它的可靠性就非常差 。也许在实 验室进行的性 能测试结果令人满意 ,但是 一旦 上线运行 ,面对恶劣的 网络环境和 良劳不齐 的第三方系统 ,问题就会如火 山一样喷发 。

 

伪异步 I/0 实际上仅仅只是对之前 I/0 线程模型 的一个简单优化 , 它无法从根本上解 决 同步 I/0 导致 的通信线程 阻塞问题 。下面我们就简 单分析下如果通信对方返回应答时间 过长 ,会 引起 的级联故 障 。

( 1) 服务端处理缓慢 ,返回应答 消息耗 费 60s ,平 时只需要 10ms

( 2 ) 采用伪异步 1/0 的线程正在读取故障服务节 点的响应,由于读取输入流是阻塞的 ,因此,它将会 被 同步阻塞 60s

( 3 ) 假如所有的可用线程都被故障服务苦苦 阻塞 ,那后续所有的 I/0 消息都将在 队列 中 排 队

( 4 ) 由于线程池 采用阻塞队 列实现 ,当队列积满之 后 ,后续入 队列的操作将被阻塞

( 5 ) 由于前端只有一个 Accptor 线程接 收客户端接入 F 它被阻塞在线程池 的同步阻塞 队列之后 ,新 的客户端请求消息将被拒绝 ,客户端会 发生大量 的连接超时

( 6 ) 由于几乎所有 的连接都超时 ,调用者会认为 系统已经崩溃 ,无法接 收新的请求消息。 如何破解这个难题 ?下节的 NIO 将给出答案


 

2.3 NIO 编程

在介绍 NIO 编程之 前 ,我 们首先需要澄清一个概念 :NIO  到底是什么的简称 ?有 人称 之为 New I/0 ,因为 它相对于之前的 I/0 类库是新增的 ,所以被称为 New   I/0,这是它的 官方叫法 。但 是 ,由于之前老的 1/0 类库是 阻塞 1/0 ,   New  I/0 类库的 目标就是要 让 Java 支持非阻塞 I/0 ,所 以,更 多的人喜欢称之为非阻塞 I/0  ( Non -block I/0 ) ,由于非阻塞 I/0 更能够体现 NIO  的特点 ,所 以本书使用的 NIO 都指的是非阻塞 J/0。

Socket 类和J ServerSocket 类相对应 ,NIO 也提供了SocketChannelServerSocketChannel 两种不 同的套接字通道实现 。这两种新增的通道都支持阻塞和非阻塞两种模式 。阻塞 模式 使用非常简单 ,但是 性能和可靠性都不好 ,非阻 塞模式 则正好相反 。开 发 人员一般可 以根 据 自己的需要来选择合适的模式 ,一般来 说 ,低负载 、低 并发 的应用程序可 以选择同 步阻 塞 1/0 以降低编程复杂度 ,但 是对于高负载、高并发 的网络应用 ,需要使用 NIO 的非阳寨 模式进行开发 。

下面 的小节首先介绍 NIO  编程中的一些基 本概念 ,然后通 过 NIO  服务端的序列图和l 源码讲解 ,让大家快速地 熟悉 NIO 编程 的关键步骤和 api的使用 。如果你 已经熟悉了 NIO 编程 ,可 以跳过 2 .3 节直接学 习后面 的章节 。

 

2.3 .1    NIO 类库简介

 新 的输入/输 出 ( NIO ) 库是在 JDK  1.4  中引入的。N IO 弥补了原来同步阻塞 I/0 的不足 ,它在标准 Java 代码中提供 了高速 的、面向 块 的 1/0。通过 定义包含数据的类 ,以及通 过 以块的形式处理这些数据 ,N IO 不用使用本机代码就可 以利用低级优化 ,这是 原来的 I/0 包所 无法做到的。下面我们对 N IO 的一些概念和功 能做下简单介绍 ,以便大 家能够快速地 了解 N IO 类库 和相 关概念 。

 

1.缓冲区 Buffer

我们首先介绍缓冲 区 ( Buffer ) 的概念 ,Buffer   是一个对象 ,它包含一些要写入或者 要读 出的数据 。在 NIO 类库中 加入 Buffer 对象 ,体现了 新库与 原 1/0 的一个重要区别 。

1.在 而 向流 的 1/0 中,可 以将数据 直接写入或者将数据直接读 到 Stream 对象中 。

2.在 N10 库中 ,所 有数据都是用缓 冲 区处理 的。在读取 数据时,它是直接 读到缓冲 区中 的:在 写入数据时,写 入到缓冲 区中。任何 时候访 问 NIO 中的数据 ,都 是边过缓冲 区进行 操作 。

 

缓冲 区实质上 是一个数组 。通常 它是一个字节数组 ( ByteBuffer ) ,也可 以使用其他种 类 的数组 。但 是一个缓 冲 区不仅仅是一个数组 ,缓冲 区提供 了对数据的结构化访 问以及 维 护读写位置 ( l imit ) 等信息 。

最常用的缓冲 区是 ByteBuffer ,一个 ByteBuffer 提供 了一组功能用于操作 byte 数组。 除了 ByteBu ffer ,应 有其他 的一些缓冲区 ,事实上 ,每一种 Java 基本类型 ( 除了 Boolean 类型 〉  都对应有一种缓冲 区,具体如下 。

ByteBuffer :字节缓 冲 区

CharBuffer :字符缓冲区

ShortBuffer :短整型缓 冲区

IntBuffer :整形缓 冲 区

LongBuffer :长整形 缓冲 区

 FloatBuffer :浮 点型缓冲 区

DoubleBuffer :双精度浮 点型缓冲区 缓冲 区的类图 继承关系如 图 2-8 所示 。

 

(基础篇 走进javaNIO)第二章-NIO入门

 

每一个 Buffer 类都是 Buffer 接 口的一个子 实例 。除 了 ByteBuffer ,每 一个 Buffer 类 都有完全一样的操作 ,只是它们所 处理的数据类型不 一样 。因为 大多数标准 1/0 操作都使 用 ByteBuffer ,所 以它除了具有一般缓冲区的操作 之外还提供一些特有的操 作 ,方便网 络 读写。

 

2.   通道 Channel

 Channel  是一个通道 ,可以通 过它读取和 写入数据 ,它就像 自来水管一样 ,网络数据 通过 Channel  读取 和写入通 道与流的不 同之处在于通道是双向 的,流只是 在一个方 向上 移动 ( 一个流必须是 InputStream 或者 OutputStream  的子类),而且 通道可以用于 读 、写或 者 同时用于读写

因为 Ch ann el 是全双工 的,所以 它可以比流更好地映射底层操作系统的 AP I 特别是 在 UNIX 网络编程模型中,底 层操作系统 的通道都是全双工的,同时支 持读写操作。

Channel 的类图继求关系如 图 2-9 所示 。

 自顶向 下看 ,前三层主要 是 Channel 接口 ,用于定义它的功能 ,后面 是一些具体的功能类 ( 抽象类 ),从类 图可 以看出,实 际上 Chann el 可 以分为 两大类 :分别是

1.网络读 写 的 SelectableChannel

2.用于文件操作的 FileChannelo

本书涉及 的 ServerSocketChannel 和 SocketChannel 都是 SelectableChanneJ   的子类 ,关 于它们 的具体用法将在后绥的代码中 体现

 

(基础篇 走进javaNIO)第二章-NIO入门

 

3.   多路复用器 Selector

 在本节中,我们将探索多 路复用器 Selector,它是 Java NIO 编程的基础 ,熟练地 掌握 Selector 对于掌握 NiO 编程至关重要 。多路复用 器提供选择 已经就绪 的任务的 能力 。简单 来讲 ,Selector 会不 断地轮询注册在其上 的 Channel ,如果某个 Channel 上面有新 的 TCP 连接接入 、读 和写事件 ,这个 Channel就处于就绪状态 ,会被Selector 轮询 出来 ,然后通 过 SelectionKey 可 以获取就绪 Channel的集合 ,进行 后续 的 I/0 操作。

一个 多路复用器 Sel ector可以同时轮询多个Ch annel,由于 JDK  使用了 epoll()代替 传统 的 select  实现 ,所 以它并没有最大连接句柄 1024/2048  的限制 。这也 就意味着 只需要一个线程负责 Selector 的轮 词,就可以接入成千上万的客户端,这确实是个非常 巨大的进步 。

下面 ,我 们通 过 NIO 编程的序 列图和源码分析来熟悉相关的概念 ,以便巩 固我们前面 所学的 NIO 基础 知识。

 

2.3.2     NIO 服务端序列图

 (基础篇 走进javaNIO)第二章-NIO入门

 

下面 ,我们对 NIO 服务端的主要创建过程进行讲解和说明 ,作为 NJO 的基础 入 门,

我们将忽略掉一些在生产环境中部署所 需要 的一些特 性和功能。

 

步骤一 :打开 ServerSocketChannel ,用于监听客户端的连接,它是有客户端连接的父管道,代码示例如下 。

 ServerSocketChannel acceptorSvr  =ServerSocketChannel .open ( )  ;

 步骤二 :绑定 监听听端口,设置连接为非阻塞模式 ,示例 代码如 下。

 acceptorSvr.socket ( ) . bind( new InetSocketAddress (InetAddress .getByName("IP"), port) )  ;

acceptorSvr .configureBlockin g(false )  ;

 步骤三 :创建 Reactor线程 ,创建多路复用器并启动线程 ,代码 如下。

Selector   selector    =   Selector .open ( ) ;

newThread ( new  ReactorTask ( ) ) . start ( )  ;

 步骤四 :将 ServerSocketChannel   注册到 Reactor  线程的多路复用器 Selector  上 ,监 听ACCEPT 事件 ,代码如 下 。

 SelectionKey  key  =  acceptorSvr.register(selector , SelectionKey.OP_ACCEPT, ioHand ler)  ;

 步骤五 :多路复用器在线程 run 方法的无 限循环体 内轮询准备就绪的 Key ,代码如下 。

int num = selector. select ( ) ;

Set  selectedKeys =selector. selectedKeys ( ) ;

Iterator it  = selectedKeys . iterator( )  ;

while (it.hasNext ( ))   {

SelectionKey    key   =    ( Se lectionKey ) it . next ( )  ;

// do something

 步骤六 :多路 复用器听到有新 的客户端接入 ,处理新的接入请求 ,完成 TCP 三 次握手 ,建立物理链 路 , 代码示例如下。

 SocketChannel   channel   =   svr Channel.accept ( )  ;

步骤七 :设置客 户端链路为非阻塞模式 ,示例代码 如下 。

channel. configureBlocking(false} ;

channel . socket  ( )  .setReuseAddress ( true)  ;

......

 步骤八 :将新接入的客户端连接注册到 Reactor 线程的多路复用器上,监听读操作 ,用来读取客户端发送的网络消息,代码如 下。

 SelectionKey key  =  socketChannel.register ( selector , SelectionKey . OP_READ , ioHandler )  ;

步骤九 :异步读取 客户端请求消息到缓冲区,示例 代码如下 。

 int readNumber   = channel.read(received.Buffer ) ;

步骤十 :对 ByteBuffer 进行编解码 ,如果有半包 消息指针 reset ,继续读取 后续 的报文 , 将解码成功 的消息封装成 Task ,投递 到业务线程池 中,进 行业务逻辑编排 ,示 例代码如下 。

 Object   message = null;

while ( buffer.hasRemin ( ) )

{

 byteBuffer .mark ( )  ;

Object message = decode ( byteBffer ) ;

if (message == n u ll )

{

 byteBuffer.reset ( )  ;

break ;

}

 messageList.add(message) ;

 }

if ( !byteBuffer.ha sRemain ( ) )

byteBffer.clear( ) ;

else

byteBffer. cornpact ( )  ;

if    ( messageList!=null&!me ssageList . isEmpty())

{

for (Object rnessage E : messageList )

{

handlerTask ( messageE )  ;

 步骤十一 :将 POJO 对象 encode 成 byteBffer,调用 SocketChannel 的异步 write 接 口, 将消息异步发送给客户端 ,示例代码如下 。

 socketChannel.write(buffer);

 注意:如果 发送 区 TCP 缓冲 区满 ,会导致写半包 ,此 时 ,需要注册监 昕写操 作位 , 循环写 ,直 到整包消息写入 TCP 缓冲区 ,此处不赘述 ,后续 Netty 源码分析章节会详细 分 析 Netty    的处理策略。

 

当我们 了解创建 NIO 服务端 的基本步 骤之后 ,下面 我们将前面 的时间服务器程序通过

NIO 重写一遍 ,让大 家能够 学 习到完整版 的 NIO 服 务端创建 。

 

2 .3.3   NIO 创建 的 Timeserver 源码分析

 我们将在 TimeServer 例程中给 出完整的 N10 创建 的时间服务器源码 。

代码 清单 2-8  NIO   时 间服务器 TimeServer

 1 package com.phei.netty.nio;
 2 
 3 import java.io.IOException;
 4 
 5 /**
 6  * @author lilinfeng
 7  * @date 2014年2月14日
 8  * @version 1.0
 9  */
10 public class TimeServer {
11     /**
12      * @param args
13      * @throws IOException
14      */
15     public static void main(String[] args) throws IOException {
16     int port = 8080;
17     if (args != null && args.length > 0) {
18         try {
19         port = Integer.valueOf(args[0]);
20         } catch (NumberFormatException e) {
21         // 采用默认值
22         }
23     }
24     MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
25     new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
26     }
27 }

我们对 N IO 创建 的 Tjm eServer 进行简单 分析下 ,16~23 行跟之前的 一样 ,设置监听 端 口。24~25 行创建了一个被称为 Multipl exerTimeServer  的多路 复用类 ,它是个一个独立 的线程 ,负责轮 询多i路 复用器 Seletor ,可 以处理多个 客户端的并发接 入 。

现在我们继续 看 MultiplexerTimeServer的源码

  1 package com.phei.netty.nio;
  2 
  3 import java.io.IOException;
  4 import java.net.InetSocketAddress;
  5 import java.nio.ByteBuffer;
  6 import java.nio.channels.SelectionKey;
  7 import java.nio.channels.Selector;
  8 import java.nio.channels.ServerSocketChannel;
  9 import java.nio.channels.SocketChannel;
 10 import java.util.Iterator;
 11 import java.util.Set;
 12 
 13 /**
 14  * @author Administrator
 15  * @date 2014年2月16日
 16  * @version 1.0
 17  */
 18 public class MultiplexerTimeServer implements Runnable {
 19 
 20     private Selector selector;
 21 
 22     private ServerSocketChannel servChannel;
 23 
 24     private volatile boolean stop;
 25     /**
 26      * 初始化多路复用器、绑定监听端口
 27      * 
 28      * @param port
 29      */
 30     public MultiplexerTimeServer(int port) {
 31     try {
 32         selector = Selector.open();
 33         servChannel = ServerSocketChannel.open();
 34         servChannel.configureBlocking(false);
 35         servChannel.socket().bind(new InetSocketAddress(port), 1024);
 36         servChannel.register(selector, SelectionKey.OP_ACCEPT);
 37         System.out.println("The time server is start in port : " + port);
 38     } catch (IOException e) {
 39         e.printStackTrace();
 40         System.exit(1);
 41     }
 42     }
 43 
 44     public void stop() {
 45     this.stop = true;
 46     }
 47 
 48     /*
 49      * (non-Javadoc)
 50      * 
 51      * @see java.lang.Runnable#run()
 52      */
 53     @Override
 54     public void run() {
 55     while (!stop) {
 56         try {
 57         selector.select(1000);
 58         Set<SelectionKey> selectedKeys = selector.selectedKeys();
 59         Iterator<SelectionKey> it = selectedKeys.iterator();
 60         SelectionKey key = null;
 61         while (it.hasNext()) {
 62             key = it.next();
 63             it.remove();
 64             try {
 65             handleInput(key);
 66             } catch (Exception e) {
 67             if (key != null) {
 68                 key.cancel();
 69                 if (key.channel() != null)
 70                 key.channel().close();
 71             }
 72             }
 73         }
 74         } catch (Throwable t) {
 75         t.printStackTrace();
 76         }
 77     }
 78 
 79     // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
 80     if (selector != null)
 81         try {
 82         selector.close();
 83         } catch (IOException e) {
 84         e.printStackTrace();
 85         }
 86     }
 87 
 88     private void handleInput(SelectionKey key) throws IOException {
 89 
 90     if (key.isValid()) {
 91         // 处理新接入的请求消息
 92         if (key.isAcceptable()) {
 93         // Accept the new connection
 94         ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
 95         SocketChannel sc = ssc.accept();        //accept后相当于完成了tip的3次握手,物理链路建立
 96         sc.configureBlocking(false);
 97         // Add the new connection to the selector
 98         sc.register(selector, SelectionKey.OP_READ);
 99         }
100         if (key.isReadable()) {
101         // Read the data
102         SocketChannel sc = (SocketChannel) key.channel();
103         ByteBuffer readBuffer = ByteBuffer.allocate(1024);
104         int readBytes = sc.read(readBuffer);//设置了sockChannel为非阻塞所以要判断返回码
105         if (readBytes > 0) {
106             readBuffer.flip();
107             byte[] bytes = new byte[readBuffer.remaining()];
108             readBuffer.get(bytes);
109             String body = new String(bytes, "UTF-8");
110             System.out.println("The time server receive order : "
111                 + body);
112             String currentTime = "QUERY TIME ORDER"
113                 .equalsIgnoreCase(body) ? new java.util.Date(
114                 System.currentTimeMillis()).toString()
115                 : "BAD ORDER";
116             doWrite(sc, currentTime);
117         } else if (readBytes < 0) {
118             // 对端链路关闭
119             key.cancel();
120             sc.close();
121         } else
122             ; // 读到0字节,忽略
123         }
124     }
125     }
126 
127     private void doWrite(SocketChannel channel, String response)
128         throws IOException {
129     if (response != null && response.trim().length() > 0) {
130         byte[] bytes = response.getBytes();
131         ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
132         writeBuffer.put(bytes);
133         writeBuffer.flip();
134         channel.write(writeBuffer);
135     }
136     }
137 }

由于这个类相比于传统 的 Socket 编程会稍微复杂一些,在此展开进行详细 分析,我 们从如下儿个关键步骤来 讲解 多路复用处理类 。

 

( 1 ) 30~42 行为 构造方法 ,在 构造方法中进行资源初始化 ,创 建多路复用器 Selector 、ServerSocketChannel ,对 Channel 和TCP参数进行配置 。例如将 ServerSocketChannel设 置 为 异步非阻塞 模式 ,它的 backlog 设置为 1024 。系统资源初 始化成功 后 ,将 ServerSocketChannel 注册到 Selector ,监 听 Selection Key.OP_ACCEPT 操作位 :如果资源初始化失败 〈 例 如端口被占用 ) ,则退 出。

( 2 ) 55~77 行在线程的 run  方法 的 while  循环体 中循环遍历 selector ,它的休眠 时间为1s,无 论是否有读 写等事件发生 ,selector 每隔 1s 都被唤醒一次 ,selector 也提供 了一个无 参 的 select 方法 。当有处 于就绪状态 的 Channel 时 ,selector 将返回就绪状态的 Channel 的SelectionKey   集合 ,通过对就 绪状态的 Ch annel 集合进行迭代 ,可 以进行 网络的异步读写 操作 。

( 3 )92~ 99 行处理新接 入 的客户端请求消息 ,根据 SelectionKey  的操作位 进行判 断即 可获知 网络事件 的类型 ,通 过 ServerSocketChannel  的 accept  接收客户端的连接请求并创 建 SocketChannel 实例 ,完成上述操作后 ,相当于完成了 TCP 的三 次握手 ,TCP 物理链路 正式建立 。注意 ,我们 需要将新创建 的 SocketChannel 设置为异步非阻塞 ,同时也可以对 其TCP参数进行设置,例如TCP 接收和发送缓冲区的大小等,作为入门的例子,没有进行额外的参数设置 。

( 4 )100 ~125 行用于读取客户端的请求 消息 ,首先创建 一个 ByteBuffer , 由于我们事先无法得知客户端发送 的码流大小,作为例程 ,我们 开辟一个1K 的缓冲 区。然 后调用 SocketChannel 的 read 方法读 取请求码流 。注意,由于我 们 已经将 SocketChannel 设置 为异 步非阻 塞模式 ,因此它的 read 是非阻塞 的。使用返 回值进行判断 ,看读取到的字 节数 ,退 回值有 以下三 种可 能的结果 。

1.返回值大于 0:读到 了字节 ,对字 节进行编解码

2.返 回值等于 0 :没有读 取到字节 ,属 于正常场景 ,忽略

3.返 回值为-1 :链路 已经关闭 ,需要关 闭 SocketChannel,释放资源

 

当读取到码流 以后 ,我们进行 解码,首先对 readBuffer 进行filp操作 ,它的 作用是将 缓冲区 当前的 limit 设置 为 position,position设置 为 0,用于后续对 缓冲区的读取操作 。然 后根据缓冲 区可读的字节个数创建字节数组 ,调用 ByteBuffer的 get  操作将缓冲 区可读 的 字节数 组复制到新 创建 的字节数组 中,最 后调用字符 串的构造 函数创建请求消息并打印。如果请求指令 是”QUERY TIME ORDER” 则把服务榕 的当前时间编码后返回给 客 户端 , 下面我们看看异步发送应答消息给 客户端的情况 。

( 5 )  127~135 行将应答消息异步发送给客户端 。我们看下关键 代码 ,首先将字 符 串编 码成字 节数组 ,根 据 字节数组的容量创建 ByteBuffer ,调 用ByteBuffer的 put 操作将字节数组复制到缓 冲 区中 ,然后对 缓冲区j进 行 flip 操作 ,最后 调用 SocketChannel  的 write 方法 将缓冲 区中的字节数组发送 出去。需要指出的是 ,由于 SocketChannel  是 异步非阻塞 的, 它并不保证一 次能够把 需要发送 的字节数组发送完 ,此 时会 出现 “ 写半包 ” 问题,我们 需 要 注 朋 写 操 作 ,不 断轮 询 Selector   将没 有 发送 完 的 ByteBuffer   发送 完 毕 ,可 以通 过 ByteBu ffer  的 hasRema ining()方法判断 消息是否发送完成 。此处仅仅是个简单 的入 门级例 程 ,没有演 示如何处理 “ 写半包 ” 场景 ,后续的章节会有 详细说 明。

使用 NIO 创建 TimeServer 服务器完成之后,我们继续学 习如伺 创建 NIO 客户端 。芮 先迹是通过时序图 了解关键步骤和过程 ,然后结合代码进行详细分析 。

 


 

2.3.4     NIO 客户端序列图

NIO 客户端创建序列图如 图 2- 1 1 所示

 

(基础篇 走进javaNIO)第二章-NIO入门

 

步骤一 :打开 SocketChannel ,绑定客户端本地地址(可选 ,默认系统会随机分配一 个可用的本地地址)
SocketChannel clientChannel=SocketChannel.open();

步骤 二 :设置SocketChannel 非阻塞模式,同时设置客户端连接的TCP参数
clientChannel.configureBlocking(false);
socket.setReuseAddess(true);
socket.setReceiveBu£ferSize(BUFFERSIZE);
socket.setSendBufferSize(BUFFERSIZE)  ;

步骤三 :异步连接服务端 ,示例代码如下 。
boolean    connected= clientChannel.connect(new InetSocketAddress("IP",port));

步骤四:判断是否连接成功如果连接成功,则直接注册读状态位到多路复用器中如果当前没有连接成功(
异步连接返回 false,说明客户端已经发送 sync包,服务端没 有返 回ack包,物理链路还没有建立 )
if(connected)
clientChannel.register(selector,SelectionKey .OP_READ,ioHandler );
else
clientChannel.register(selector,SelectionKey .OP_CONNECT ,ioHandler );

步骤五 :向 Reactor 线程的多路复用器注册 OP_CONNECT 状态位 ,监 昕服务端 的 TCP
ACK 应答 , 示例代码如下 。
clientChannel.register(selector,SelectionKey .OP_CONNECT ,ioHandler );

步骤六 :创建Reactor 线程 ,创建多路复用器并启动线程
Selector selector=selector.open ( ) ;
new    Thread ( new ReactorTa sk ( ) ).start ( ) ;

步骤七 :多路 复用器在线 程 run 方法的无 限循环 体 内轮询准备 就绪的 Key

int    num    =selector .select( )  ;
Set    selectedKeys=selector.selectedKeys( )  ;
iterator it    =selected.Keys.itera tor ( );
while(it.hasNext( )){
SelectionKey key=(SelectionKey) it.next( );
//. . .    do something

步骤八 :接 收 connect 事件进行 处理
if(key.isConnectable()) {
//do thing
}

步骤丸 :判 断连接结果 ,如果连接成功 ,注册读事件到多路 复用器
if(channel.finishConnect( ))
registerRead ( )  ;

步骤十 :注册读事件到多路复用器
clientChannel.register(selector,SelectionKey.OP_READ,ioHandler )  ;

步骤十一 异步读 客户端请求消息到缓冲区 示例代码如 下 。
int    readNumber=channel.readreeceivedBuffer );

 
步骤十二 :对 ByteBuffer进行编解码如果有半包消息接收缓冲区Reset继续读取后续的报文 将解 码成功的消息封装成
Task ,投递到业务线程池中进行业务逻辑编排

(基础篇 走进javaNIO)第二章-NIO入门

(基础篇 走进javaNIO)第二章-NIO入门

 

步骤十三 :将 POJO 对象 encode 成 ByteBuffer ,调用 Sock巳tChannel 的异步 write 接口 , 将消息异步发送给客户端

socketChannel .write (buffer )  ;

通过序列图和关键代码 的解说 ,相信大 家对创建 NIO 客户端程序 已经有了一个初步的 了解 ,下面就跟 随着 我们的脚步 ,继续看看 如何使用 NIO 改造之前的时间服务器客户端 TimeClient 吧。

 

2.3 .5  NIO 创建 的 TimeClient 源码分析

 我们首先还是看下如何对 TimeClient 进行 改造 。

代码 清羊 2-9 NIO 时间服务器客户 端 TimeClient

 1 package com.phei.netty.nio;
 2 
 3 /**
 4  * @author lilinfeng
 5  * @date 2014年2月14日
 6  * @version 1.0
 7  */
 8 public class TimeClient {
 9 
10     /**
11      * @param args
12      */
13     public static void main(String[] args) {
14 
15     int port = 8080;
16     if (args != null && args.length > 0) {
17         try {
18         port = Integer.valueOf(args[0]);
19         } catch (NumberFormatException e) {
20         // 采用默认值
21         }
22     }
23     new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001")
24         .start();
25     }
26 }

与之前唯一不 同的地方在于通过创建 TimeClientHandle 线程来处理异步连接和读写操 作 ,由于 TimeClient 非常简单且变更不大 ,

这里重 点分析 TimeCli entHandle ,代码如 下。

 

  1 package com.phei.netty.nio;
  2 import java.io.IOException;
  3 import java.net.InetSocketAddress;
  4 import java.nio.ByteBuffer;
  5 import java.nio.channels.SelectionKey;
  6 import java.nio.channels.Selector;
  7 import java.nio.channels.SocketChannel;
  8 import java.util.Iterator;
  9 import java.util.Set;
 10 /**
 11  * @author Administrator
 12  * @date 2014年2月16日
 13  * @version 1.0
 14  */
 15 public class TimeClientHandle implements Runnable {
 16     private String host;
 17     private int port;
 18     private Selector selector;
 19     private SocketChannel socketChannel;
 20     private volatile boolean stop;
 21 
 22     
 23     public TimeClientHandle(String host, int port) {
 24     this.host = host == null ? "127.0.0.1" : host;
 25     this.port = port;
 26     try {
 27         selector = Selector.open();
 28         socketChannel = SocketChannel.open();
 29         socketChannel.configureBlocking(false);
 30     } catch (IOException e) {
 31         e.printStackTrace();
 32         System.exit(1);
 33     }
 34     }
 35 
 36     /*
 37      * (non-Javadoc)
 38      * 
 39      * @see java.lang.Runnable#run()
 40      */
 41     @Override
 42     public void run() {
 43     try {
 44         doConnect();
 45     } catch (IOException e) {
 46         e.printStackTrace();
 47         System.exit(1);
 48     }
 49     while (!stop) {
 50         try {
 51         selector.select(1000);
 52         Set<SelectionKey> selectedKeys = selector.selectedKeys();
 53         Iterator<SelectionKey> it = selectedKeys.iterator();
 54         SelectionKey key = null;
 55         while (it.hasNext()) {
 56             key = it.next();
 57             it.remove();
 58             try {
 59             handleInput(key);
 60             } catch (Exception e) {
 61             if (key != null) {
 62                 key.cancel();
 63                 if (key.channel() != null)
 64                 key.channel().close();
 65             }
 66             }
 67         }
 68         } catch (Exception e) {
 69         e.printStackTrace();
 70         System.exit(1);
 71         }
 72     }
 73 
 74     // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
 75     if (selector != null)
 76         try {
 77         selector.close();
 78         } catch (IOException e) {
 79         e.printStackTrace();
 80         }
 81     }
 82 
 83     private void handleInput(SelectionKey key) throws IOException {
 84 
 85     if (key.isValid()) {
 86         // 判断是否连接成功
 87         SocketChannel sc = (SocketChannel) key.channel();
 88         if (key.isConnectable()) {
 89         if (sc.finishConnect()) {
 90             sc.register(selector, SelectionKey.OP_READ);
 91             doWrite(sc);
 92         } else
 93             System.exit(1);// 连接失败,进程退出
 94         }
 95         if (key.isReadable()) {
 96         ByteBuffer readBuffer = ByteBuffer.allocate(1024);
 97         int readBytes = sc.read(readBuffer);
 98         if (readBytes > 0) {
 99             readBuffer.flip();
100             byte[] bytes = new byte[readBuffer.remaining()];
101             readBuffer.get(bytes);
102             String body = new String(bytes, "UTF-8");
103             System.out.println("Now is : " + body);
104             this.stop = true;
105         } else if (readBytes < 0) {
106             // 对端链路关闭
107             key.cancel();
108             sc.close();
109         } else
110             ; // 读到0字节,忽略
111         }
112     }
113     }
114 
115     private void doConnect() throws IOException {
116     // 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
117     if (socketChannel.connect(new InetSocketAddress(host, port))) {
118         socketChannel.register(selector, SelectionKey.OP_READ);
119         doWrite(socketChannel);
120     } else
121         socketChannel.register(selector, SelectionKey.OP_CONNECT);
122     }
123 
124     
125     private void doWrite(SocketChannel sc) throws IOException {
126     byte[] req = "QUERY TIME ORDER".getBytes();
127     ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
128     writeBuffer.put(req);
129     writeBuffer.flip();
130     sc.write(writeBuffer);
131     if (!writeBuffer.hasRemaining())
132         System.out.println("Send order 2 server succeed.");
133     }
134 
135 }

与服务端类似,接下来我们通过对关键步骤的源码进行分析和解读,让大家深入了解如何创建NIO客户端以及如(可使用NIO的APl)

(l)23~34行构造函数用于初始化NIO的多路复用器和SocketChannel对象。需要注意的是,创建SocketChannel之后,需要将其设置为异步非阻塞模式。就像在2.3.3章节中所讲的,我们可以设置SocketCbannel的TCP参数,例如接收和发送的TCP缓冲区大小。

(2)43~48行用于发送连接请求,作为示例,连接是成功的,所以不需要做重连操作,因此将其放到循环之前。下面我们具体看看doConnect的实现,代码跳到第116~123行,首先对SocketChannel的connect()操作进行判断,如果连接成功,则将SocketChannel注册到多路复用器Selector上贯注册SelectionKey.OP_READ,如果没有直接连接成功,则说明服务端没有返回TCP握手应答消息,但这并不代表连接失败,我们需要将SocketChannel注册到多路复用器Selector上,注册SelectionKey.OP_CONNECT,当服务端返回TCP syn-ack消息后,Selector•就能够轮询到这个SocketChannel处于连接就绪状态。

(3)49~72行在循环体中轮询多路复用器Selector,当有就绪的Channel    时’,执行第59行的har1dlelnput(key)方法,下面我们就对handlelnput方法进行分析。

(4)跳到第83行,我们首先对SelectionKey进行判断,看它处于什么状态。如果是处于连接状态,说明服务端己经返回ACK应答消息。这时我们需要对连接结果进行判断,调用SocketChannel的finishConnect()方法,如果返回值为true,说明客户端连接成功;如果返回值为false或者直接抛出IOException,说明连接失败。在本例程中返回值为true,
说明连接成功。将SocketChannel注册到多路复用器上,注册SelectionKey.OP_READ操作位,监听网络读操作,然后发运请求消息给服务端。

下面我们对doWrite(sc)进行分析。代码跳到125行,我们构造请求消息体,然后对其编码,写入到发送缓冲区中,最后调用SocketCbannel的write方法进行发送。由于发送是异步的,所以会存在“半包写”问题,此处不再赘述。最后通过hasRemaining)方法对发送结果进行判断,如果缓冲区巾的消息、全部发送完成,打印"Sendorder2serversucceed."

(5)返代码第95行,我们继续分析客户瑞是如何读取时间服务器应答消息的。如果客户端接收到了服务端的应答消息,则SocketChannel是可读的,由于无法事先判断应答码流的大小,我们就预分配lM的接收缓冲区用于读取应答消息,调用SocketCha11nel的read()方法迸行异步读取操作。由于是异步操作,所以必须对读取的结果进行判断,这部分的处理逻辑已经在2.3.3章节详细介绍过,此处不再赘述。如果读取到了消息,则对消息进行解码,最后打印结果。执行完成后将stop置为true,线程退出循环。

(6)线程退出循环后,我们需要对连接资源进行释放,以实现“优雅退出”。75~80行用于多路复用器的资源释放,由于多路复用器上可能注册成千上万的Channel或者pipe,如果一一对这些资源选行释放显然不合适。因此,JDK底层会自动释放所有跟此多路复用器关联的资源,JDK的APIDOC如图2-12所示。
(基础篇 走进javaNIO)第二章-NIO入门


到此为止,我们已经通过NlO对时间服务器完成了改造,并对源码进行了分析和解读,下面分别执行时间服务器的服务端和客户端看执行结果。

 (基础篇 走进javaNIO)第二章-NIO入门

服务端执行结果如图 2- 13 所示

 (基础篇 走进javaNIO)第二章-NIO入门

客户端执行结果如 图 2- 14 所示 。

 

 

通过源码对比分析,我们发现NlO编程难度确实比同步阻塞BIO大很多,我们的NIO例程并没有考虑“半包读”和“半包写”,如果加上这些,代码将会更加复杂。NIO代码既然这么复杂,为什么它的应用却越来越广泛呢,使用NIO编程的优点总下。

(l)客户端发起的连接操作是异步的,可以通过在多路复用器注册OPCONNECT等待后续结果,不需要像之前的客户端那样被同步阻塞。
(2)    SocketCbannel的读写操作都是异步的,如果没有可读写的数据它不会同步等待,直接返回,这样I/0通信线程就可以处理其他的链路,不需要同步等待这个链路可用。
(3)线程模型的优化:由于JDK的Selector在Linux等主流操作系统上通过epoll实现,它没有连接句柄数的限制(只受限于操作系统的最大句柄数或者对单个进程的句柄限制〉,这意味着一个Selector线程可以同时处理成千上万个客户端连接,而且性能不会随着客户端的增加而线性下降,因此,它非常适合做高性能、高负载的网络服务器。
JDl(l.7    升级了NIO类库升级后的NIO    类库被称为NI02.0,引人注目的是,Java正式提供了异步文件1/0操作,同时提供了与UNIX网络编程事件驱动I/0对应的ACO,下面的2.4章节我们学习下如何利用NI02.0编写AIO程序,还是以时间服务器为例进行讲解。


 

2.4    AIO编程

NI02.0   引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。异步通道提供两种万式获取获取操作结果。

1.通过java.util.concurrent.Future类来表示异步操作的结果;

2.在执行异步操作的时候传入一个java.nio.channels

CompletionHandler接口的实现类作为操作完成的回调。


NI02.0的异步套接宇通道是真正的异步非阻塞I/0,它对应unix网络编程中的事件驱动I/0(AIO),它不需要通过多路复用器(Selector)对注册的通道进行轮询操作即可实现异步读写,从而简化了NIO的编程模型。


下面通过代码来熟悉N102.0AlO的相关类库,仍旧以时间服务器为例程进行讲解

 

 


 

2.5    4种1/0的对比

2.5.1    概念澄清

为了防止由于对一些技术概念和术语的理解或者叫法不一致而引起歧义,本小节特意对本书中的专业术语或者技术用语做下声明:如果它们与其他一些技术书籍的称呼不一致,请以本小节的解释为准。

1.异步非阻塞1/0

很多人喜欢将JDKl.4提供的NIO框架称为异步非阻塞1/0,但是,如果严格按照UNIX网络编程模型和JDK的实现进行区分,实际上它只能被称为非阻塞1/0不能叫异步非阻塞1/0。在早期的JDK1.4版本之前,JDK的Selector基于select/poll模型实现,它是基于I/0复用技术的非阻塞1/0,不是异步I/0。在JDKl.5updatclO和Linux core2.6以上版本,Sun优化了Selctor的实现,它在底层使用epoll替换了select/poll,上层的API并没有变化,可以认为是JDKNIO的一次性能优化,但是它仍旧没有改变1/0的模型。相关优化的官方说明如图2-17所示。


由JDKl.7提供的N102.0,新增了异步的套接字通道,它是真正的异步1/0,在异步1/0
操作的时候可以传递信号变量,当操作完成之后会回调相关的方法,异步I/0也被称为AlO。


NIO类库支持非阻塞读和写操作,相比于之前的同步阻塞读和写,它是异步的,因此很多人习惯于称NIO为异步非阻塞I/0,包括很多介绍NIO编程的书籍也沿用了这个说法。为了符合大家的习惯,本书也会将NIO称为异步非阻塞I/0或者非阻塞I/0,请大家理解,不要过分纠结在一些技术术语的咬文嚼字上。
(基础篇 走进javaNIO)第二章-NIO入门

 

2.    多路复用器Selector

几乎所有的中文技术书籍都将Selector翻译为选择器,但是实际上我认为这样的翻译并不恰当,选择器仅仅是字面上的意思,体现不出Selector的功能和特点。
在前面的章节我们介绍过JavaNIO的实现关键是多路复用io技术,多路复用的核心就是通过Selector来轮询注册在其上的Channel,当发现某个或者多个Channel处于就绪状态后,从阻塞状态返回就绪的Channel的选择键集合,进行iO操作。由于多路复用器是NO实现非阻塞I/0的关键,它又是主要通过Selector实现的,所以本书将Selector翻译为多路复用器,与其他技术书籍所悦的选择器是同一个东西,请大家了解。

3.    伪异步1/0

伪异步1/0的概念完全来源于实践。在JDKNO编程没有流行之前,为了解决Tomcat通信线程同步1/0导致业务线程被挂住的问题,大家想到了一个办法:在通信线程和业务线程之间做个缓冲区,这个缓冲区用于隔离I/0线程和业务线程间的直接访问,这样业务线程就不会被I/0线程阻塞。而对于后端的业务侧来说,将消息或者Task放到线程池后就返回了,它不再直接访问I/0线程或者进行1/0读写,这样也就不会被同步阻塞。类似的设计还包括前端启动一组线程,将接收的客户端封装成Task,放到后端的线程池执行,用于解决一连接一线程问题。像这样通过线程池做缓冲区的做法,本书中习惯于称它为伪异步I/0,而官方并没有伪异步Io这种说法,请大家注意。


下面的小节我们对几种常见的/0进行对比,以便大家能够理解几种I/0的差异。

 

2.5.2    不同1/0模型对比

不同的I/0模型由于线程模型、Api等差别很大,所以用法的差异也非常大。由于之前的几个小节已经集中对这几种I/0的API和用法进行了说明,本小节会重点对这几种I/0进行功能对比。如表2-1。
表2-1    几种1/0模型的功能和特性对比

 

 

 

同步阻塞 1/0  ( B10)

伪异步 1/0

非阻塞 1/0 C NIO)

M: l ( I 个 l/0 线程处理 多个客户端连接〉

异步 1/0  (AIO)

 

客户 端个数 :1/0 线程

 

1:   1

M:  N ( 某中M 可

以大于 N)

M : 0 ( 不需要 启动额外

的 i/0 线程,被动回调〉

I/0 类型 ( 阻塞)

阻塞 1/0

阻塞 io

非阻塞 io

 

同步 1/0  (i0 多路复用〉

非阻塞 io

io 类型 (司步)

同步 1/0

同步 1/0

异步  io

API 使用难度

简单

简单

非常复杂

复杂

调试难度

简单

简单

复杂

复杂

可靠性

非常差

吞吐量

 尽管本书是专门介绍NIO框架Netty的,但是,并不意味着所有的Java网络编程都必须要选择NlO和Netty,具体选择什么样的I/0模型或者NIO框架,完全基于业务的实际应用场景和性能诉求,如果客户端并发连接数不多,周边对接的网元不多,服务器的负载也不重,那就完全没必要选择NIO做服务端:如果是相反情况,那就要考虑j选择合适的NIO框架进行开发。
对比完Java的几种主流1/0模型之后,我们继续看下为什么要选择Ne时进行NlO开发,而不是直接使用JDK的NIO原生类库。



2.6    选择Netty的理由

在开始本节之前,我先讲一个亲身经历的故事:曾经有两个项目组同时用到了NIO编程技术,一个项目组选择自己开发NIO服务端,直接使用JDK原生的Api,结果两个多月过去了,他们的NIO服务端始终无法稳定,问题频出。由于NIO通信是它们的核心组件之一,因此,项目的进度受到了严重的影响,领导对此也非常恼火。另一个项目组直接使用Netty作为NIO服务端,业务的定制开发工作量非常小,测试表明,功能和性能都完全达标,项目组儿乎没有在NIO服务端上花费额外的时间和精力,项目迸展也非常顺利。
这两个项目组的不同遭遇告诉我们:开发出高质量的NIO程序并不是一件简单的事情,除去NIO固有的复杂性和BUG不谈,作为一个NIO服务端,需要能够处理网络的闪断、客户端的重复接入、客户端的安全认证、消息的编解码、半包读写等情况,如果你没有足够的NIO编程经验积累,一个NIO框架的稳定往往需要半年甚至更氏的时间。更为糟糕的是,一旦在生产环境中发生问题,往往会导致跨节点的服务调用中断,严重的可能会导致整个集群环境都不可用,需要重启服务器F这种非正常停机会带来巨大的损失。
从可维护性角度看,由于NIO采用了异步非阻塞编程模型,而且是一个I/0线程处理多条链路,它的调试和跟踪非常麻烦,特别是生产环境中的问题,我们无法进行有效的调试和跟踪,往往只能靠一些日志来辅助分析,定位难度很大。

2.6.1    不选择Java原生NIO编程的原因

现在我们总约一下为什么不建议开发者直接使用JDK的NIO类库进行开发,具体原因如下。
(1)NIO的类库和IAPI繁杂,使用麻烦, 你需要熟练掌握Selector、ServerSocketChannel、SocketCl1annel、ByteBuffer等。
(2)需要具备其他的额外技能做铺垫,例如熟悉Java多线程编程。这是因为NIO编程涉及到Reactor模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的NIO程序。
(3)可靠性能力不足,工作量和难度都非常大。例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等问题,NIO编程的特点是功能开发相对容易,但是可靠性能力补齐的工作量和难度都非常大。
(4)JDKNIO的BUG,例如臭名昭著的epollbug,它会导致Selector空轮闹,最终导致CPU100%。官方声称在JDKJ.6版本的update18修复了该问题,但是直到JDKl.7版本该问题仍旧存在,只不过该BUG发生概率降低了一些而己,它并没有被根本解决。该BUG以及与该BU-G相关的问题单可以参见以下链接内容。


http://bugs.java.com/bugdatabase/view_b,ug.do?bug一id=6403933

http://bugs.java.com/bugdatabase/view_bug.do?bug_id=2l47719
(基础篇 走进javaNIO)第二章-NIO入门


由于上述原园,在大多数场景下,不建议大家直接使用JDK的NIO类库,除非你精通NIO编程或者有特殊的需求。在绝大多数的业务场景中,我们可以使用NTO框架Netty来进行NIO编程,它既可以作为客户端也可以作为服务端,同时支持UDP和异步文件传输,功能非常强大。
下个小节我们就看看为什么选择Netty作为基础通信框架。

2.6.2    为什么选择Netty

Netty是业界最流行的NIO框架之一,它的健壮性、功能、性能、可定制性和可扩展性在同类框架中都是首屈一指的,它己经得到成百上千的商用项目验证,例如Hadoop的RPC框架avro使用Netty作为底层通信框架;很多其他业界主流的RPC框架,也使用Netty来构建高性能的异步通信能力。
通过对Netty的分析,我们将它的优点总结如下。


l.API使用简单,开发门槛低;
2.功能强大,预置了多种编解码功能,支持多种主流协议;
3.定制能力强,可以通过ChannelHandle对通信框架灵活扩展
4.性能高,通过与其他业界主流的NIO框架对比,Netty的综合性能最优成熟、稳定,Netty修复了已经发现的所有JDKNIOBUG,业务开发人员不需要再为NI0的BUG而烦恼;
5.社区活跃,版本法代周期短,发现的BUG    可以被及时修复,同时,更多的新功能会加入;
6.经历了大规模的商业应用考验,质量得到验证。在互联网、大数据、网络游戏、企业应用、电信软件等众多行业得到成功商用F    证明了它已经完全能够满足不同行业的商业应用了。

正是因为这些优点,Netty逐渐成为JavaNIO编程的首选框架。



2.7    总结

本章通过一个简单的demo开发,即时间服务器程序,让大家熟悉传统的同步阻塞1/0、伪异步I/0、非阻塞l/0(NIO)和异步1/0 (AIO)的编程和使用差异,然后对比了各自的优缺点,并给出了使用建议。
最后,我们详细介绍了为什么不建议读者朋友们直接使用JDK的NIO原生类库进行异步1/0的开发,同时对Netty的优点进行分析和总结,给出使用Netty进行NJO开发的理由。
相信学完本章之后,大家对Java    的网络编程已经有了初步的认识,从下一个章节开始,我们正式进入Netty的世界,学习和掌握基于Netty 的网络开发。

 

相关文章: