高性能聊天系统
本章以建立一个聊天系统为例,介绍如何使用J2SE 1.4非堵塞I/O(NI/O)为核心开发一个高性能的实时互动服务器系统,这个高性能服务器系统可以拓展为更广阔的应用,如游戏系统、社区系统或者数据实时采集系统。
1.1 系统需求
聊天交流是目前互联网提供的主要内容。聊天系统有多种实现方式,类似ICQ属于一种点对点的聊天系统,还有一种是基于Socket的集中式聊天系统,这种聊天系统需要登录统一的聊天服务器,每个人的聊天信息其他人都可以看到,类似一种会议室,当然,两个人之间也可以进行保密的私语。
在基于Socket的聊天系统中,主要有两种角色:服务器和客户端,不同的客户端登录集中式的服务器,通过服务器将一个客户端发出的信息推送到其他所有客户端。
基于Socket的聊天系统最早实现是使用网页刷新方式,通过客户端不断地自动刷新,将服务器端整个页面内容下载到客户端显示,这种方式的聊天速度慢,而且有刷屏现象,很快被更新的聊天技术所替代。
聊天系统在客户端和服务器之间主要传送的是文字信息,服务器端只需要把最新的文字信息推送到客户端,这样减少了网络传输内容,节省了网络传输的时间,无疑提高了聊天速度。这种“推”技术是目前基于Socket聊天系统的主要实现技术。
一个基于Socket的聊天系统有下列具体功能要求:
(1)客户端和服务器必须保持随时随地的连接。这有别于普通Web浏览的连接方式。在使用浏览器访问服务器时,先由客户端发出HTTP协议,然后服务器响应处理这个客户端的响应,再返回处理结果;请求(Request)和响应(Response)是一种一对一的前后因果关系。
而在基于Socket的聊天系统中,客户端发出聊天信息的同时,客户端也在接受服务器发送过来的其他人的聊天信息,因此,请求和响应不存在那种前后对应关系,是两种分别独立进行的进程。
因为服务器任何时候都可能发送信息到客户端,因此,客户端和服务器一旦建立连接,必须能让服务器在以后发送中寻找定位到这个连接。
(2)在速度性能方面,聊天系统提出了更高的要求。在网络连接的薄弱环节I/O通信方面,要求能够实现无堵塞地、流畅地数据读写。在面对几百个甚至更多的客户端同时发出连接信息的情况下,服务器要求能够保持高性能的并发处理机制,迅速地完成这几百个并发请求的处理和发送任务。
(3)在扩展性和伸缩性方面,聊天系统也提出了一定的要求。当一台服务器不能满足要求时,必须在客户端不知晓的情况下,通过不断增加服务器就能方便地拓展聊天系统的整体处理能力。对于客户端用户来说,这些服务器群都象征一个统一的服务器,不需要他们在进入聊天室之前先选择具体的服务器;也没有单个聊天室最大人数的限制,如果可以,服务器群可以支撑一个巨大容量的聊天室。
1.2 架构设计
本系统的设计核心是Socket底层通信,基于快速稳定的Socket底层通信架构,不但可以实现聊天系统,还可以实现其他如游戏、数据采取等实时性要求较高的系统,甚至可以建立一个快速的平台服务器系统。相比J2EE服务器系统,该平台系统的最大优势就是精简的代码带来的高性能。
当然,如果单纯追求高性能和速度,也许直接使用汇编就可以。使用Java设计这样的实时系统,实际还有一种很重要的目的,即追求高度的可扩展性和伸缩性。
因此,本系统设计必须将高性能和高伸缩性两个方面和谐地统一起来,不能盲目追求性能而破坏面向对象的编程风格和模式;也不能因为追求更大的重用性,建立太多复杂的中间层,其实这方面J2EE已经做得很好,有关J2EE的应用将在以后章节重点讨论。
当然,高性能应该是本系统的主要特色,为了实现本系统高效率的并发处理性能,设计上将采取Reactor模式实现自我快速触发;网络通信上,使用非堵塞I/O进行流畅地数据读写,在应用逻辑上,通过非堵塞的多线程技术实现并发功能处理。特别是J2SE 1.4以后版本推出的非堵塞I/O,将使得Java表现出和C语言同样的优越性能。
1.2.1 Java事件模型
事件只有在被触发时才会发生,它的发生是程序系统无法预料和计划的。例如,火警探测器只有在发生火警时才会触发,但是火警的发生是无法事先预料的,它处于时刻可能发生当中。为了能响应这些无法预料发生的事件,必须建立一套事件处理机制,以预备在发生事件时实现相应的处理。
通常,在一个事件处理框架里有下列3种角色。
· 源目标:事件发生者。
· 监视者:将监察侦听事件的发生,事件发生后,它会被通知到。
· 处理者:事件发生后,它将实现一定的行为动作,也就是处理事件。
在Java中有几种模式表达这3种角色之间的关系。
监视模式是一种最常用的模式,GOF《设计模式》中的观察者模式和监视模式类似,这将在以后章节讨论。
在监视模式中,监视者本身也兼任处理者的角色,3种角色被实现为两个独立对象,源目标为一个对象,而监视者和处理者两个角色同位于一个对象中。
例如,一个可视化JavaBean对象Button属于源目标,它通过addActionListener绑定一个监视者对象java.awt.event.ActionListener的子类来完成事件触发机制,当它被用户点按时,ActionListener的子类将完成相应点按事件的处理,如图1-1所示。
图1-1 Java GUI中的事件处理
以本系统为例,当用户输入服务器端地址和端口,单击连接按钮后,将启动连接远程服务器线程,其中部分重要方法如下:
connectButton.addActionListener(new ClientFrame_connectButton_actionAdapter(this));
void connectButton_actionPerformed(ActionEvent e) {
//启动连接服务器线程
NonBlockingSocket nonBlockingSocket = new NonBlockingSocket(url, port);
nonBlockingSocket.setDaemon(true);
nonBlockingSocket.start();
}
连接按钮connectButton通过addActionListener方法加入了一个监视者对象,监视者ClientFrame_connectButton_actionAdapter代码如下:
class ClientFrame_connectButton_actionAdapter
implements java.awt.event.ActionListener {
ClientFrame adaptee;
ClientFrame_connectButton_actionAdapter(ClientFrame adaptee) {
this.adaptee = adaptee;
}
public void actionPerformed(ActionEvent e) {
adaptee.connectButton_actionPerformed(e);
}
}
上述代码中,通过适配器模式将事件处理委托源目标实现,这就产生了第二种事件处理模式——委托模式。
委托模式使源目标、监视者和处理者3种角色各自实现为独立的3个对象,在这3个对象之间传输的是事件,这个模式在系统复杂时会经常使用,例如在J2EE的B/S架构中,前台JSP的表单提交事件后,由MVC模式中的Servlet获得事件数据,经过简单封装成事件对象后,委托给后台EJB层实现进一步处理,如图1-2所示。
图1-2中,客户端浏览器将表单事件直接提交到服务器端的JSP/Servlets,JSP/Servlets作为事件的监视者,接收到事件后,并不对事件立即进行处理,而是委托给JavaBeans/EJB进行复杂的逻辑或运算处理。
图1-2 B/S多层结构委托模式
以上两种模式特点是至少有两个对象分别代表事件的3个角色,而在Reactor模式中,则是在一个对象中绑定了这3种角色,Reactor的意思是自我触发、自主激活的意思。
在J2SE 1.4版本中的新特性非堵塞I/O(Nonblocking I/O)提供了基于Reactor模式的实现,这大大简化了基于Socket的应用和编程,如图1-3所示。
图1-3 Reactor模式
在非堵塞I/O API中,监视器是其一个重要的类Selector,被监视的源目标是可以被Selector联系的SelectableChannel(基本也属于Selector的相关部分),事件类型有:是否有接受的连接(OP_ACCEPT)、是否可以连接(OP_CONNECT)、是否可以读取(OP_READ)和是否可以写入(OP_WRITE)。
监视器Selector主要是监视这些事件,一旦发生,生成SelectionKey对象,Selector是自我触发、自我激活的,因此是典型的Reactor模式实现,其原理将在后面章节详细讨论。
但是,在非堵塞I/O API中,并不是由Selector来实现事件的处理,事件处理是由Selector激活出来后,通过其他处理器Handler来实现处理,开发者使用非堵塞I/O API需要做的工作就是:获取Selector激发的事件,然后根据相应事件类型,编制自己的处理器代码来进行具体处理。例如,如果是可读取事件,那么编制代码从SelectableChannel读取数据包,然后处理这个数据包。
在非堵塞I/O API中,使用Reactor模式将事件发生和事件处理两个部分实现分离解耦,事件发生部分只负责事件的激活,而事件处理由专门的处理器实现具体处理。
1.2.2 架构设计图
|
图1-4 架构层次图 |
考虑到系统的可重用性和伸缩性,需要将本系统的网络通信底层和应用系统分离开。这样,基于可重用的网络通信层,可以实现其他各种实时性较高的应用系统,同时,系统还需要提供一些基本功能支持,如网络连接状态管理以及用户状态相关管理,前者为实现一个动态的实时在线系统提供基本连接的管理,后者类似J2EE中Servlet部分的Session管理。
本系统在架构设计上将分3个层次,如图1-4所示。
本系统最底层是Socket通信层,将负责客户端和服务器之间快速的数据交换,它通过接口层和最上面应用层实现解耦,同时又通过接口层和应用层保持实时数据联系。用户从客户端进入到本系统前,将实现统一的用户登录验证机制。有关用户登录验证机制有多种实现方式,可以参见后面章节的讨论。
用户成功进入系统以后,将会有一个生存周期,生存周期依据不同底层协议有不同的具体实现。不管哪一种实现方式,都必须在内存中保存用户连接的相关状态,如用户的IP地址、用户最新连接时间等。
为了保证系统的安全性,用户在登陆验证通过后将分配一个随即的SessionID,用户的每次请求都将包含这个SessionID,服务器每次接受请求后,将此SessionID和保存在内存中的数据实现核对。
这里将着重讨论Socket底层以及接口层的设计和实现。Socket底层设计分两大部分:协议设计和连接处理设计;接口层的目的是提供底层和应用层一个中介媒体作用,但是不能设计得太复杂,以免延误数据传送时间。
1.2.3 协议设计
TCP是一种面向连接的协议,传输数据比较可靠。TCP协议中包含了专门的传递保证机制:当接收方收到发送方传来的信息时,会自动向发送方发出确认消息;发送方只有在接收到该确认消息之后才正式传送数据信息,否则将一直等待直到收到确认信息为止。
TCP在正式收发数据前,首先必须建立可靠的连接。一个TCP连接需要经过3次对话才能建立起来,其中的过程非常复杂。
基于TCP有各种会话应用协议,如HTTP、FTP等协议。其中,HTTP协议是Internet最常用的协议,其最大的特点是能够穿透各种防火墙,因此,传送数据包以HTTP协议传送是一个实用的选择。
UDP是面向非连接的协议,传送数据之前不需要建立专门的连接,直接发送就可以,因此速度要比TCP快。由于UDP协议并不提供数据传送的保证机制,因此可能发生丢包的情况。UDP适合一次只传送少量数据、对可靠性要求不高的应用环境。
基于以上因素,在本系统中,聊天信息属于一种短小信息,一般情况下可以使用UDP发送,但是为防止数据丢包,在UDP发送失败的情况下可采取TCP再发送一次,而传送的数据采取HTTP协议。这是一个基于TCP/UDP、使用HTTP协议传送数据的混合实现方案。
这种方案带来的最大特点就是通信速度快,服务器和客户端减少了等待连接时间,提高了发送和响应时间。特别对于服务器而言,由于所需TCP链接数量减少,降低了因为建立、维护和撤销TCP链接所带来的服务器负荷,提高了服务器的吞吐量。
本方案模式同样适用于无线通信领域,目前无线通信网络的带宽比较窄,特别是网络质量很不稳定,客户端如果位于J2ME手机端,采取UDP/TCP混合方案可以同时解决带宽窄和网络不稳定两个问题。
本系统的具体实现分服务器和客户端两个方面。客户端将首先采取UDP发送,在UDP发送失败的情况下,采取TCP再进行发送;服务器处理HTTP请求后,产生相应的HTTP响应,响应数据如果无法放进一个UDP数据包中,则要求客户端使用TCP重试。当然,也可以采取其他TCP/UDP选择方案,要求系统中,这种选择策略是可以替换的。
1.2.4 多线程
前面介绍了3种Java事件处理模式。在一个微观系统中,事件处理机制的实现主要是依靠Java线程来实现,一般监视者是一个线程,专门用于监测源目标的变化。
在 Java 程序中使用多线程要比在 C 或C++ 中容易得多,因为 Java 编程语言提供了语言级的支持,但是这并非意味着在使用时可以避开线程的一些基本问题。在以后章节中介绍的JSP/Servlet容器,实际是一个线程池容器,JSP在运行时将编译成Servlet,而Servlet是一种线程类,J2EE通过Servlet概念的提出,确保开发者不用担心线程以及同步等问题,可以像往常一样编程。
无论是开发独立多线程的Java Application或使用Servlet,有一个概念总是需要时刻注意:对同一资源访问时需要考虑同步(Synchronization)的问题。但是,同步使用需要慎重,过多使用反而会降低性能,甚至发生死锁(DeadLock),同步只是在复杂的情况下不得已使用的一种办法。在使用同步之前有两个因素需要仔细考虑:首先确定是否一定需要同步;然后确定被访问资源是否属于原子型(atomic)的。下面从这两个方面详细讨论一下。
在一些情况下,多线程访问同一个资料是不需要同步的,如读操作,针对方法体内局部变量的写操作也不需要同步,关键是对类变量的访问操作,一旦设置了类变量,那么就需要非常小心。采取类变量有两种形式,如下:
public class Test{
private int state;
private volatile long stateLong ;
private byte[] states = null;
private String stateStrs = null;
private final Object stateObject = null;
private HashMap map = new HashMap;
private Hashtable hashtable = new Hashtable;
public void setState(int state){
this.state = this.state + state;
}
…
}
在上面的Test类中,有5个类变量,分别代表5种不同的类变量:
(1)state的类型是整型(int),整型是属于Java原始型变量(primitive)。原始变量的操作访问都是原子型(atomic)的(long和double除外),因此对于原始型变量的操作访问都是线程安全的,不需要实现同步。
(2)对long和double操作访问可以加上volatile,如上面代码第3行。多线程工作中有主内存和工作内存之分,在JVM中有一个主内存,专门负责所有线程共享数据;而每个线程都有它自己私有的工作内存,volatile变量表示保证它必须是与主内存保持一致,它实际是变量的同步。但是,由于volatile在Java语言规范中表单不够详细,不是所有的Java虚拟机都支持volatile的。
(3)第4行是一个数组state,数组是属于对象(Object),因此,对数组state的访问必须使用Synchronization实现同步。当然,String也属于对象,因此使用时需要注意,在这种情况下还是有可能避免使用Synchronization,而使用Java的对象不变性(immutability)。
不变性对象就是指自从产生那一刻起就无法再改变的对象,一个对象如果有下列2种情况就属于不变性对象,对这些不变性对象的访问就无需使用同步。
(4)String之类对象。一旦赋值给String,该String对象的长度和内容都不会改变,如果要变化需通过同样性质的类StringBuffer来实现。
(5)使用final,这样就阻止对这个类再进行继承拓展的可能,而且可以提高JVM的效率,例如Test类中第6行。
一个类的所有类属性都是通过类的构造方法来设置,没有其他set之类的方法。
类似String的trim()或toUpperCase()这样的修改后数据结果是在另外一个对象中,String的trime()等方法并不是对自己本身对象进行修改,而是将结构保存到另外一个对象中。
因此,在实际编程中,尽量操作方法体内局部变量,这样就不需要考虑同步问题。如果必须做成类变量,那么,想办法使自己的类变量变成一个不变性对象,还是可以避免同步(Synchronization)的使用。
使用HashMap或Hashtable保存对象引用时也需要注意同步的问题。在向HashMap中加入新对象引用时,要使用同步方法;而Hashtable已经实现了内部同步,则在同样操作时不需要加同步,同样,List和Vector也是这样的关系。
在必须使用同步的情况下,要注意避免发生死锁。死锁的情况是:A线程试图访问一个资源对象,但这个对象正在被B线程访问,处于锁定状态,暂时无法使用,如果B一直不释放锁定,那么A线程就发生死锁现象。
避免死锁没有完全之策,只有根据自己的应用小心设计,有几种办法对避免死锁有所帮助。
(1)通过制造缩小同步范围,尽可能地实现代码块同步。
(2)如果使用wait,可以指定毫秒数,让它在一定时间后结束等待,避免死锁。因为wait是被notify()或notifyAll()唤醒的,要保证这两个方法确实能够唤醒wait。
(3)使用性能调试工具可以检测死锁现象发生,如Borland的Optimizeit Profiler。
程序系统中一定要避免死锁,线程死锁后会一直占据CPU,这称为Block,这时的CPU使用率100%,严重阻碍了其他线程的运行,降低了系统的性能,容易发生Block还有等待I/O的响应,现在有了非堵塞I/O的帮助,这个问题基本可以避免。
另外一个容易阻塞住CPU的使用就是死循环,使用while(true)这样的语句让线程进入死循环运行,这样的线程会一直占据CPU,解决办法很灵活,如下:
(1)使用while (!Thread.interrupted())代替while(true)语句,这样使得线程在执行错误时能够放弃对CPU独霸。
(2)在循环体内尽量加上sleep(1000L)这样的语句,这样让CPU有空闲处理其他线程。如果必须做到实时,那么考虑是否有其他应用上的前提限制,使用这些前提条件暂时阻止循环反复执行。
以本系统为例,在客户端需要将用户输入的聊天信息发往服务器,那么建立一个线程一直实现发送功能,由于客户端监视用户输入也有一个监视线程在运行,例如使用Swing实现时,ActionListener的具体实现将监视用户输入。
|
图1-5 队列Queue模式 |
这样有两个线程各司其职。一个线程负责监视输入,另外一个线程负责将输入发送出去。那么在这两个线程之间如何通信?最经常使用的办法是使用队列(Queue)模式。Queue模式是处理消息通信的基本办法,如图1-5所示。
在图1-5中,一个线程负责不断向Queue中加入新的对象,而另外一个线程则不断地从Queue中读取加入的对象,在Queue中,对象数据排着队等待提取。在Java中,LinkList是队列Queue的最好实现。
在本系统中应用Queue模式就有一个问题,加入动作是由用户输入决定的,一旦有用户输入,就会发生加入动作,这由Swing的ActionListener负责,那么,提取线程会在队列另外一端进入死循环不断地读取,这样才能在队列中一旦有对象事件时,能够被立即提取出来,因此必须使用while (!Thread.interrupted())实现死循环。
但是必须注意到,每次循环中的提取动作执行是有前提条件的——队列中有对象事件。如果在有对象事件时,通知提取线程,这样可以避免提取线程一直霸占CPU “傻等”,使用线程的wait()和notifyAll()可以达到这个目的。
提取线程的循环体内设置wait()进行中断等待,加入对象后,执行notifyAll(),这样提取线程将中断等待,从Queue中读取加入的对象。线程在中断等待时,将释放CPU的霸占,这样就有效率地利用了CPU,如图1-6所示。
图1-6 改进后的队列Queue模式
由此可见,并不是说使用了多线程就能提高系统性能,更重要的是还要注意提高CPU使用效率,防止Block发生。
提高多线程的使用效率还必须了解下列几点:
(1)线程运行的次序并不是按照创建它们时的顺序来运行的,CPU处理线程的顺序是不确定的。如果需要确定,那么必须手工介入,使用setPriority()方法设置优先级,但是这种方法在Windows NT下有时也不一定有效果。
(2)要避免大量线程运行时发生堵塞现象,可以通过设置线程优先级来实现,但是同时又必须注意到,在大量线程被堵塞时,最高优先级的线程先运行,但是不表示低级别线程不会运行,只是运行概率较小而已。
(3)使用yield()会自动放弃CPU,有时比sleep更能提升性能。
(4)检查所有可能Block的地方,尽可能多地使用sleep或yield()以及wait();尽可能延长sleep(毫秒数)的时间;运行的线程不能超过100个;要注意到不同平台Linux或Windows以及不同JVM运行性能差别很大。
1.2.6 非堵塞I/O
传统网络系统运行的性能瓶颈通常在I/O读写,包括对端口和文件的操作。过去,当打开一个Socket的I/O通道后,使用下列语句:
Socket socket = new Socket(url,port);
InputStream in = socket.getInputStream();
while(!Thread.interrupted()){
int byteRead = in.read();
…
}
其中,read()守候在端口边不断读取传输过来的字节内容,如果读取不到任何字节内容,read()也只能等待,这会使得整个程序系统被锁住,影响程序系统继续做其他事情。一个普遍的改进办法就是开设线程,让线程去等待或轮询。但是,这种做法相当耗费资源的。更主要的是,当线程轮询后,若有事件发生,只有等到线程下次轮询时才会知道。即没有一种这样的途径:当有事件发生时,能够主动发出通知。
在前面讨论中,非堵塞I/O 作为Reactor模式的实现,实际上提供了一种事件发生、自我激活、主动通知的机制。因此使用非堵塞I/O将大大提高系统的I/O处理性能。
非堵塞I/O中有3个重要的类:Selector、SelectionKey 和Channel。
Selector 实际是一个Reactor类,主要监视管理一系列SelectionKey,每个SelectionKey代表一种Channel和Selector之间的关系。在某个Channel上如果发生某种连接事件,Selector将会自动激活产生一个SelectionKey对象。即SelectionKey属于事件对象(Event Object),是动态的,每发生一个连接事件就产生一个SelectionKey对象。
从被激活的SelectionKey中,外界可以知道每个Channel发生的具体事件类型,这些事件包括:是否发生连接、是否可以读或者是否可以写等。
从以上分析可以看出,Selector有自我激活的能力。使用Selector时,只要告诉它需要关注的特定事件,Selector将会一直监视这种特定事件,一旦发生,就发出通知。类似火警警报器,一旦发现失火事件,立即会主动激活报警。
由于Selector只负责事件发生,不负责事件处理,事件处理是由开发者编制程序实现,因此,使用者需要自己建立一套获取发生事件的机制。
总之,非堵塞I/O的使用包括两大部分:注册事件和获取事件。下面将简单演示一下这两部分的使用实现。
首先,需要向Selector注册外界感兴趣的事件,创建Selector对象如下:
Selector selector = Selector.open();
或
Selector selector = SelectorProvider.provider().openSelector();
Selector是一个观察者,那么它观察谁?当然是连接通道Channel,这种Channel是一种SelectableChannel,即可以和Selector发生联系的Channel。SelectableChannel常用的有两种:SocketChannel和ServerSocketChannel,这两种SelectableChannel的区别是可注册的事件不一样:
- ServerSocketChannel 对应事件:OP_ACCEPT。
- SocketChannel 对应事件:OP_CONNECT、OP_READ、OP_WRITE。
后者一般使用在服务器端,可以从中知道有无可以接受的客户端连接。创建ServerSocketChannel如下:
ServerSocketChannel sc = ServerSocketChannel.open();
创建一个ServerSocketChannel后,需要将其和主机端口进行绑定,例如和192.168.0.1的8009端口绑定:
InetSocketAddress address = new InetSocketAddress("192.168.0.1", 8009);
sc.socket().bind(address);
sc.configureBlocking(false); //设置为非堵塞
现在如果需要从这个ServerSocketChanne了解有无可接受的客户端连接,语法如下:
SelectionKey acceptKey = sc.register( selector, SelectionKey.OP_ACCEPT )
上面一条语句是用Selector注册ServerSocketChannel实例,返回一个Key实例,通常SelectionKey对象都是线程安全型的,但是修改感兴趣的事件操作时,这个方法是被标记为同步的,即在调用interestOps()方法时会锁定一段时间,因此,实际应用中,如果有一个以上的线程来调用同一个Selector对象时,需要使用Selector.wakeup()来解锁。
以上非堵塞I/O的注册事件工作已经准备就绪,那么,在正常运行中,如果Selector发现了事先注册的事件,如何传递出来呢? 这就需要建立一个事件获取通道。
其实,获取事件时,只要执行语句selector.select(),这将触发系统内部自动检查所有使用这个Selector注册的Channel状态。如果在某个Channel发现有感兴趣事件发生,这条语句的返回结果将大于0,通过这个信息外界可以知道有网络连接事件发生了,那么又如何知道是哪个具体Channel发生的呢?
使用selector.selectedKeys()获取一个SelectionKey结果集,遍历这个结果集,通过每个SelectionKey就可以找到发生事件的Channel,这样可以从这个Channel进行读写数据,如图1-7所示。
图1-7 非堵塞I/O原理图
图1-7基本展示了非堵塞I/O的原理结构,这部分结构主要实现了Reactor模式中的事件到达部分,当有读或写等任何实现注册的事件发生时,可以从Selector中获得相应的SelectionKey,从SelectionKey可以找到相应的Channel,从而能获得客户端发送过来的数据。
1.3 Socket核心设计和实现
通过前面的系统设计,基本解决了本系统实现的主要技术问题,下面将就具体的实现细节展开讨论。
整个系统的核心底层是非堵塞I/O技术,通过使用这一新技术,可以实现底层网络I/O的无堵塞、流畅地读写,为整个系统的高性能运行奠定了坚实的基础。
非堵塞的Socket I/O有两大部分:服务器和客户端。在两端都将采取这一新技术,根据TCP/UDP不同,又分别有两套Socket详细实现。
非堵塞的Socket I/O实现和以往堵塞I/O的实现在编程上有所不同,以前堵塞I/O的Socket读写是一种被动行为,即new Socket这些语句可以根据自己系统的应用要求放置在任何位置,可以由程序员自己任意安排的,而非堵塞I/O的Socket读写则不一样,它类似一个主动的、有自己“意志”行为的独立线程(因为使用了Reactor模式),所以,什么时候读取数据,什么时候写入数据不是由程序员自己能掌控的,是由Selector决定的。
因此这两种I/O模式的不同使用,决定了不同的编程模式和思维习惯,从堵塞I/O转到非堵塞I/O上,对程序员有些考验。
下面将先讨论服务器的Socket非堵塞I/O实现。
1.3.1 TCP和Reactor模式
Reactor模式是属于一种自我触发、自我激活的模式,非堵塞I/O的Selector实现了Reactor模式主要部分,将连接事件自我触发,以SelectionKey事件形式爆发出来。
因此,只要建立一个线程类,反复检查Selector中是否有触发的SelectionKey,如果有,就再次触发对这些事件进行相应的处理,建立一个名为Reactor的线程类。在这个Reactor类中,将使用Selector的事件触发机制,触发本应用系统的事件处理机制。
首先以基于TCP连接的Socket实现为例,在Doug Lea 的《Scalable IO in Java》这本电子文档中,使用Reactor模式很好地实现了事件的自触发机制,如图1-8所示。
图1-8 服务器处理连接事件
在图1-8中,Reactor向Selector注册了一个关于TCP的连接事件OP_ACCEPT(是否有可接受的连接事件)。当客户端第一次开始连接服务器时,OP_ACCEPT事件将激活,Reactor将检测到这个激活事件对象SelectionKey,从SelectionKey的attachment中获取Acceptor线程对象,直接运行Acceptor线程。
Acceptor将完成两件事情:
(1)向Selector注册了一个新的连接事件OP_READ(是否可以读取数据)。这是假设客户端一旦连接上服务器后,将首先向服务器发送数据,一旦TCP连接握手成功,服务器首先要处于准备读取数据的状态。
(2)更改SelectionKey中的attachment,修改为Handler线程对象,这是一个处理读取或写入数据的线程类。
当客户端发送数据到服务器时,可读取事件OP_READ发生了,Reactor又检测到这个事件对象SelectionKey,从SelectionKey的attachment中获取Handler线程对象,立即运行这个线程。
Handler线程从SelectionKey中提取SocketChannel,再从这个Channel中读取数据,然后向Selector注册一个新的连接事件OP_WRITE,以便服务器在处理完成读取的数据后,再写入发送到客户端。
当OP_WRITER事件发生时,Handler线程又开始运行,这次是向SocketChannel写入数据,写入完成后,向Selector再注册新的连接事件OP_READ,这样一个请求/响应模式的数据处理基本完成,准备进入下一个循环。
创建Reactor类如下(程序1-1):
程序1-1
public class TCPReactor implements Runnable {
private final static String module = TCPReactor.class.getName();
private final Selector selector; //Selector 实例
private final ServerSocketChannel sc; //SeletableCannel一个实现
public TCPReactor (int port) throws IOException {
selector = Selector.open(); //创建Selector实例
sc = ServerSocketChannel.open(); //创建ServerSocketChannel实例
InetSocketAddress address =
new InetSocketAddress(InetAddress.getLocalHost(), port);
sc.socket().bind(address); //绑定ServerSocketChannel
Debug.logVerbose("-->Start host:"+ InetAddress.getLocalHost()+" port=" + port);
sc.configureBlocking(false); //设置为非堵塞
//向selector注册该channel感兴趣的事件为OP_ACCEPT
SelectionKey sk = sc.register(selector, SelectionKey.OP_ACCEPT);
//利用sk的attache功能绑定Acceptor 如果有事件触发Acceptor
sk.attach(new Acceptor(selector, sc));
Debug.logVerbose("-->attach(new Acceptor()!");
}
public void run() {
try {
while (!Thread.interrupted()) { //反复运行,检查是否有触发的key
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
//Selector如果发现channel有事件发生,进行key的遍历
while (it.hasNext())
//来一个事件 第一次触发一个accepter线程
//以后触发SocketReadHandler
dispatch( (SelectionKey) (it.next()));
selected.clear();
}
} catch (IOException ex) {
Debug.logError("reactor stop!" + ex, module);
}
}
//运行Acceptor或SocketReadHandler
private void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
if (r != null) {
Debug.logVerbose("-->dispatch running");
r.run();
}
}
}
在线程的run()方法中,通过while (!Thread.interrupted())语句不断地对Selector进行事件检查,一旦有事先注册的关注的事件发生,运行dispatch(SelectionKey k)方法进行分配处理,在dispatch方法中,从SelectionKey的attachment中获得的一个线程,然后启动这个线程,这样,获取发生事件后,同时也驱动了对事件的进一步处理。
那么,这个线程是如何被赋予SelectionKey呢?原来在前面有一句:
sk.attach(new Acceptor(selector, sc));
SlectionKey有两种处理附件attachment的方法:
public abstract class SelectionKey
{
…
public final Object attach (Object ob) //类似setAttachment
public final Object attachment( ) //类似getAttachment
…
}
首先通过attach将一个对象和SelectionKey发生联系,然后再通过attachment( )获得这个对象,这个对象可以是任何业务对象、处理器或另外一个Channel。attach只是保存对象的引用,在使用完成这个功能后,要使用attach(null)来清除附件对象的引用,以便垃圾回收机制能够回收这个附件对象。
SelectionKey注册自己的特定对象用如下语句:
SelectionKey key = channel.register (selector, SelectionKey.OP_READ, myObject);
等同于下列语句:
SelectionKey key = channel.register (selector, SelectionKey.OP_READ);
key.attach (myObject);
本系统中是采取后者做法:
sk.attach(new Acceptor(selector, sc));
那么被attach的线程对象Accpetor是对事件实行进一步处理的,注意一下事先注册的事件是SelectionKey.OP_ACCEPT,即系统运行开始时,第一个关注的事件总是OP_ACCEPT:是否有可接受的网络连接,这是服务器运行后一直应该关注的头等事件。
如果有这样的事件发生,那么就会激活Accpetor线程对象,从而启动一个Acceptor线程,在这个线程中,将准备下一步工作,就是再向Selector注册其他事件,例如这个连接是否可以读出或是否可以写入数据等。代码如下(程序1-2):
程序1-2
public class Acceptor implements Runnable {
private final Selector selector;
private final ServerSocketChannel ssc;
public Acceptor(Selector selector, ServerSocketChannel ssc) {
this.selector = selector;
this.ssc = ssc;
}
public void run() {
try {
Debug.logVerbose("-->ready for accept!");
SocketChannel sc = ssc.accept();
if (sc != null) {
sc.configureBlocking(false); //设定为非堵塞
SelectionKey sk = sc.register(selector, 0); //注册这个SocketChannel
//同时将SelectionKey标记为可读,以便读取
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup(); //因为interestOps,防止Selector死锁
sk.attach(new Handler(sk, sc)); //携带Handler对象
}
} catch (Exception ex) {
Debug.logVerbose("accept stop!" + ex);
}
}
}
在Accpetor中,从ServerSocketChannel获得SocketChannel实例,这两个Channel可注册的事件是不一样的,后者可以注册是否可读或可写等事件。Accpetor代码中注册了是否可以读SelectionKey.OP_READ的事件,然后attach了Handler线程对象。
这样,Selector将一直关注OP_READ事件,一旦有这类事件发生,将激活attachment为Handler线程的运行。Handler在可读事件发生后启动,就是从SocketChannel中读取客户端传送的数据了,Handler代码如下(程序1-3):
程序1-3
public class TCPHandler implements Runnable {
private final static String module = TCPHandler.class.getName();
private final SocketChannel sc;
private final SelectionKey sk;
private SocketHelper socketHelper; //Socket读写帮助类
public TCPHandler (SelectionKey sk, SocketChannel sc) throws IOException {
this.sc = sc;
this.sk = sk;
socketHelper = new SocketHelper();
Debug.logVerbose(" SocketReadHandler prepare ...", module);
}
public void run() { //线程run方法
Debug.logVerbose("Handler running ...", module);
try {
if (state == READING) read(); //读取数据
else if (state == SENDING) send(); //写入数据
} catch (Exception ex) {
Debug.logError("readRequest error:" + ex, module);
socketHelper.close(sc);
sk.cancel();
}
}
//从SocketChannel中读取数据
private void read() throws Exception{
try {
//从Socket中读取byte[]数组
byte[] bytes = socketHelper.readSocket(sc);
if (bytes.length == 0) throw new Exception();
//实现服务器聊天核心处理功能,这里暂时打印出来,方便测试
System.out.println(" ge result is :" + new String(bytes));
state=SENDING;
sk.interestOps(SelectionKey.OP_WRITE); //注册新的事件
} catch (Exception ex) {
throw new Exception(ex);
}
}
//向SocketChannel写入数据
private void send()throws Exception{
try {
//写入测试数据
String request1 = "come back";
System.out.println(" send result is :" + request1);
socketHelper.writeSocket(request1.getBytes(),sc);
state=READING;
sk.interestOps(SelectionKey.OP_READ);
} catch (Exception ex) {
throw new Exception(ex);
}
}
}
在Handler的read方法中,简单地从SocketChannel中读取Message一个实例,然后打印出来,下一步可以在这里启动新的线程,进行聊天具体处理。如获取这个聊天信息的接受方用户ID,然后以用户ID寻找出它的SocketChannel,从而向对方用户发出该信息。
为了实现一个分布式的服务器环境,可以使用JMS这样的消息处理系统,通过查询该服务器内的用户名单,如果对方用户ID不是登录本服务器,那么通过JMS将消息发送给它。
1.3.2 UDP实现
1.3.1节是实现了TCP的Socket实现,在本系统中,最经常使用的还是UDP。UDP的Socket实现相比TCP要简单一点,只有两个连接事件:OP_READ和OP_WRITE。因为没有连接,所以没有OP_ACCEPT或OP_CONNECT这样建立连接的事件发生。
整个流程顺序类似TCP的实现,创建UDPReactor类如下(程序1-4):
程序1-4
public class UDPReactor implements Runnable {
private final static String module = UDPReactor.class.getName();
private final Selector selector;
public UDPReactor(int port) throws IOException {
selector = Selector.open();
InetSocketAddress address =
new InetSocketAddress(InetAddress.getLocalHost(), port);
DatagramChannel channelRec = openDatagramChannel();
channelRec.socket().bind(address); //绑定socketAddress
//向selector注册该channel
SelectionKey key = channelRec.register(selector, SelectionKey.OP_READ);
key.attach(new UDPHandler(key, channelRec));
}
//生成一个DatagramChannel实例
private DatagramChannel openDatagramChannel() {
DatagramChannel channel = null;
try {
channel = DatagramChannel.open();
channel.configureBlocking(false);
} catch (Exception e) {
Debug.logError(e, module);
}
return channel;
}
public void run() { // normally in a new Thread
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
//来一个事件 第一次触发一个accepter线程
//以后触发SocketReadHandler
dispatch( (SelectionKey) (it.next()));
selected.clear();
}
} catch (IOException ex) {
Debug.logError("reactor stop!" + ex, module);
}
}
//运行Acceptor或SocketReadHandler
private void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
if (r != null) {
Debug.logVerbose("-->dispatch running");
r.run();
}
}
}
与TCP不同的就是使用DatagramChannel来代替了TCP中的ServerSocketChannel和SocketChannel,DatagramChannel是专门用于UDP的SelectableChannel。
另外一个不同点是,UDP中没有了Acceptor类,不用专门处理建立事件的相关事件,直接读取或写入数据,建立Handler如下(程序1-5):
程序1-5
public class UDPHandler implements Runnable {
private final static String module = UDPHandler.class.getName();
private final DatagramChannel datagramChannel;
private final SelectionKey key;
private static final int READING = 0, SENDING = 1;
private int state = READING;
private SocketAddress address = null;
public UDPHandler(SelectionKey key, DatagramChannel datagramChannel) throws
IOException {
this.datagramChannel = datagramChannel;
this.key = key;
Debug.logVerbose(" UDPHandler prepare ...", module);
}
public void run() { //线程运行方法
Debug.logVerbose(" UDPHandler running ...", module);
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (Exception ex) {
Debug.logError("readRequest .. error:" + ex, module);
close(datagramChannel);
key.cancel();
}
}
//从datagramChannel读取数据
private void read() throws Exception {
try {
//这里应该是聊天服务器处理聊天信息的核心功能
//下面只是测试用代码,直接从客户端读取字符串并打印出来
byte[] array = new byte[512];
ByteBuffer buffer = ByteBuffer.wrap(array);
address = datagramChannel.receive(buffer);
String str = new String(array);
System.out.println("handlePendingReads() :" + str);
state = SENDING;
key.interestOps(SelectionKey.OP_WRITE); //注册为可写事件
} catch (Exception ex) {
Debug.logError("readRequest .. error:" + ex, module);
}
}
//向datagramChannel写入数据
private void send() throws Exception {
try {
//以下是测试发送数据
String request = "come back";
ByteBuffer buffer1 = ByteBuffer.wrap(request.getBytes());
datagramChannel.send(buffer1, address);
System.out.println("send :" + request);
state = READING;
key.interestOps(SelectionKey.OP_READ);
} catch (Exception ex) {
Debug.logError("readRequest .. error:" + ex, module);
}
}
…
}
在这里,Handler中实现了从datagramChannel读取数据和写入数据功能。
至此,整个服务器底层核心功能基本完成。这部分功能的性能很大程度上影响了整个系统的运行性能,因此需要对之实行单独性能测试。为了方便测试,先要开发出客户端测试程序,通过多台客户端多线程地不断对服务器进行连接访问和数据传送,从而可以对Java的这些最新技术提供的性能参数有一个彻底实际的了解。
1.3.3 客户端实现
为了使客户端的I/O读写能够保持流畅,客户端的Socket读写也使用非堵塞I/O实现,在前面章节中已经讨论过,由于非堵塞I/O本身类似一个独立自主的Reactor模式,而客户端界面输入也是一个事件监视模式,因此,要实现这两个独立模式之间的数据通信,需要使用队列Queue模式。
首先建立TCP客户端非堵塞I/O类TCPClient如下(程序1-6):
程序1-6
public class TCPClient extends Thread {
private final static String module = TCPClient.class.getName();
//这是一个消息队列,用于和前台客户端界面输入实现通信
private final static MessageList messageList = MessageList.getInstance();
private InetSocketAddress socketAddress;
private Selector selector;
private SocketHelper socketHelper; //Socket读写帮助
public TCPClient(String url, int port) {
try {
socketAddress = new InetSocketAddress(url, port);
selector = Selector.open();
openSocketChannel(); //开启一个SocketChannel
socketHelper = new SocketHelper();
} catch (Exception e) {
Debug.logError("init error:" + e, module);
}
}
//直接开启一个SocketChannel
private SocketChannel openSocketChannel() {
SocketChannel channel = null;
try {
channel = SocketChannel.open();
channel.configureBlocking(false);
channel.connect(this.socketAddress); //绑定SocketAddress
channel.register(selector, SelectionKey.OP_CONNECT); //注册OP_CONNECT事件
} catch (Exception e) {
Debug.logError(e, module);
}
return channel;
}
public void run() { //线程运行方法
try {
while (!Thread.interrupted()) {
if (selector.select(30) > 0) { //为防止底层堵塞,设置TimeOutt事件
doSelector(selector);
}
}
} catch (Exception e) {
Debug.logError("run error:" + e, module);
}
}
//分别获取触发的事件对象SelectionKey
private void doSelector(Selector selector) throws Exception {
Set readyKeys = selector.selectedKeys();
Iterator readyItor = readyKeys.iterator();
while (readyItor.hasNext()) {
SelectionKey key = (SelectionKey) readyItor.next();
readyItor.remove();
doKey(key);
readyKeys.clear();
}
}
private void doKey(SelectionKey key) throws Exception {
SocketChannel keyChannel = null;
try {
keyChannel = (SocketChannel) key.channel();
if (key.isConnectable()) { //如果连接成功
if (keyChannel.isConnectionPending()) {
keyChannel.finishConnect();
}
Debug.logVerbose(" connected the server", module);
sendRequest(keyChannel); //首先发送数据
key.interestOps(SelectionKey.OP_READ); //注册为可写
} else if (key.isReadable()) { //如果可以从服务器读取response数据
readResponse(keyChannel);
key.interestOps(SelectionKey.OP_WRITE);
} else if (key.isWritable()) { //如果可以向服务器发送request数据
sendRequest(keyChannel);
key.interestOps(SelectionKey.OP_READ);
}
} catch (Exception e) {
Debug.logError("run error:" + e, module);
socketHelper.close(keyChannel);
throw new Exception(e);
}
}
//向服务器发送信息
private void sendRequest(SocketChannel keyChannel) throws Exception {
try {
Message request = messageList.removeReqFirst(); //获取队列中的数据
String strs = (String)request.getObject();
Debug.logVerbose(" send the request to the server =" + strs, module);
//写入Socket
socketHelper.writeSocket(strs.getBytes("UTF-8"), keyChannel);
} catch (Exception e) {
Debug.logError(e, module);
throw new Exception(e);
}
}
//从服务器读取信息
private void readResponse(SocketChannel keyChannel) throws Exception {
try {
byte[] bytes = socketHelper.readSocket(keyChannel); //从Socket读取数组字节
Debug.logVerbose(" read the response from the server:" +
new String(bytes), module);
//实现其他处理,如在客户端屏幕显示服务器的反应信息
} catch (Exception e) {
Debug.logError(e, module);
throw new Exception(e);
}
}
public static void main(String[] args) {
Debug.logVerbose("Starting client...", module);
try {
String url = "220.112.110.61";
int port = 81;
TCPClient nonBlockingSocket = new TCPClient(url, port);
nonBlockingSocket.start();
Debug.logVerbose("create a request ...", module);
//连续发送100条信息到客户端
for(int i=0; i<100; i++){
Message request = new ObjectMessage(i + "hello I am Peng" + i);
messageList.pushRequest(request);
}
} catch (Exception e) {
Debug.logError("Client start error:" + e);
}
Debug.logVerbose("Client started ...", module);
}
}
该TCPClient代码与服务器端的代码结构有所不同,其实可以一样的实现。这里只是说明一下非堵塞I/O API的多种调用写法。
与服务器端不同的是,这里的Reactor模式是在一个线程类中实现,这对于小数量的客户端I/O来说是可以允许的。但是在服务器端,由于有很多连接,如果像客户端这样,将Socket的读写操作和Socket侦听合并在一个线程中完成,会降低服务器的处理性能。因此在服务器端专门设立了线程类Handler来处理Socket的读写操作,将读写操作委托给Handler线程后,Reactor自己可以有更多精力做好侦听工作。当然,对于繁忙的服务器,也可以设立多个Reactor同时侦听,这样服务器的灵敏度就更高。
相比而下,客户端的I/O灵敏度无需如此复杂,只要能保持流畅读写就可以,因此整个Socket侦听和读写都集中在一个类中实现。
为了对TCP的非堵塞I/O进行测试,实现了下列语句:
for(int i=0; i<100; i++){
Message request = new ObjectMessage(i + "hello I am Peng" + i);
messageList.pushRequest(request);
}
这是向服务器端连续发送了100个信息请求。实现这样的功能,在堵塞I/O中只要直接调用Socket向里面写数据就可以了,但是在非堵塞I/O中,什么时候能读、什么时候能写不能在代码编写时决定,只能在运行时,根据事件触发来实现。因此就使用了一个队列Queue,只要把需要发送的信息数据放在这个Queue中,然后由Reactor根据自己的情况从Queue中读取发送出去。
messageList.pushRequest()方法就是向这个Queue中放入数据。为了防止Reactor在Queue中没有数据时还在不断地读取,这里使用了线程的触发机制,当Queue中为空时,读取Queue的线程处于等待暂停状态;一旦有数据放入,就触发读取线程开始读取。这样也是为了防止读取线程发生堵塞,完全独霸CPU,导致其他线程不能正常运行。
MessageList代码如下(程序1-7):
程序1-7
/**
* <p>Copyright: Jdon.com Copyright (c) 2003</p>
* <p>Company: 上海解道计算机技术有限公司</p>
* @author banq
* @version 1.0
*/
public class MessageList {
private final static String module = MessageList.class.getName();
//Request信号的Queue
private LinkedList requestList = new LinkedList();
//Response信号的Queue
private LinkedList responseList = new LinkedList();
//使用单态模式保证当前JVM中只有一个MessageList实例
private static MessageList messageList = new MessageList();
public static MessageList getInstance(){
return messageList;
}
//加入数据
public void pushRequest(Message requestMsg) {
synchronized (requestList) {
requestList.add(requestMsg);
requestList.notifyAll(); //提醒锁在requestList的其他线程
}
}
//取出Queue中第一数据
public synchronized Message removeReqFirst() {
synchronized (requestList) {
// 如果没有数据,就锁定在这里
while (requestList.isEmpty()) {
try {
requestList.wait(); //等待解锁 等待加入数据后的提醒
} catch (InterruptedException ie) {}
}
return (Message) requestList.removeFirst();
}
}
}
在这个MessageList中有两个LinkeList,分别是请求信号Queue和响应信号Queue,为了确保MessageList是全局惟一的实例,这里使用了单态模式。
单态模式就是一种保证一个类只有一个实例的模式,单态模式在Java中经常使用,该模式将在以后章节详细介绍。
在removeReqFirst()方法中,如果当前Queue中为空,就实现线程锁等待,这样节省了CPU占用时间,实现了高效率运行。当pushRequest方法被调用时,通过requestList.notifyAll()通知所有锁住requestList等线程将可以继续运行。虽然MessageList本身不是一个线程,但是它的方法是提供线程调用的。
MessageList在本系统设计中非常重要,这将在以后章节进一步讨论。
实现完成客户端测试程序后,可以进行连接测试,先启动服务器端Socket,然后直接启动TCPCLient,客户端屏幕显示结果如下:
03-8-15 11:49:17 connected the server
03-8-15 11:49:17 send the request to the server =0hello I am Peng0
03-8-15 11:49:17 read the response from the server: com back
03-8-15 11:49:17 Client started ...
03-8-15 11:49:17 send the request to the server =1hello I am Peng1
03-8-15 11:49:17 read the response from the server: com back
03-8-15 11:49:17 send the request to the server =2hello I am Peng2
03-8-15 11:49:17 read the response from the server: com back
03-8-15 11:49:17 send the request to the server =3hello I am Peng3
…
服务器端屏幕结果输出如下:
03-8-15 14:28:40 [com.jdon.jserver.Server] begin to read config file
03-8-15 14:28:40 [com.jdon.jserver.Server] Server Port=81
03-8-15 14:28:40 [Debug:Verbose] -->Start host:peng-althon/220.112.110.61 port=81
03-8-15 14:28:40 [Debug:Verbose] -->Start serverSocket.register!
03-8-15 14:28:40 [Debug:Verbose] -->attach(new Acceptor()!
03-8-15 14:28:40 [Debug:com.jdon.jserver.Server:Verbose] Server started ...
ge result is :0hello I am Peng0
ge result is :1hello I am Peng1
ge result is :2hello I am Peng2
该测试试验结果表明,以Reactor模式建立的Socket底层网络通信已经正常运行,可以在其基础上进行应用层的深入开发。当然首先要做好Socket底层和应用层的接口工作,一个具有良好的拓展性和伸缩性的接口系统可以保证Socket底层代码的最大重用性,从而为平台服务器软件的开发奠定坚实的基础。
1.4 Socket接口设计和实现
完成了Socket底层设计后,需要设计接口层以便和应用层对接。在一般情况下,大多数程序员会直接将应用核心相关代码写入Socket底层代码中,这样就会造成应用层和底层紧密耦合在一起,如果开发其他应用系统,还需要修改Socket底层代码。
在前面测试调试中已经发现,Socket底层的非堵塞I/O代码相对传统的堵塞I/O来说,在调试和运行上要困难和麻烦一些。因此,一旦调试运行正常,一般程序员都不希望再次修改Socket底层代码,因此,需要将这些底层代码与外界调用分离开来,形成一个“黑匣子”,只提供一个惟一通道和外界发生关系,这种关系也必须是松散的,而不是紧密的。
这种通道以及松散的关系就是设计接口层的目的,接口层是实现Socket底层和应用层的接口部分,因此接口层主要是实现两者数据之间联系以及转换的“琐碎”事情,包含很多小的“转换器”。
1.4.1 队列和对象类型
其实这样的接口设计已经在前面章节提到,使用了队列Queue模式,由于非堵塞I/O的自我触发、独立运行的模式,外界无法直接控制非堵塞I/O的数据读写。因此,设计一个队列Queue,将需要发送和接受的数据放在这个队列中,这样,应用系统层和Socket底层就可以实现数据的交换。队列Queue是底层与应用层之间的连接通道,这种通道连接是一种松散的联系,如图1-8所示。
图1-8 请求信号发生触发流程图
图1-8表示一个请求信号由客户端应用层产生后,将被放入队列Queue中,然后如图1-6所示的队列Queue触发机制,在非堵塞I/O的Selector发现网络可写时,从Queue中读取这个请求信号,发送到服务器端;在服务器端,非堵塞I/O的Selector发现有数据后,触发可读事件,接受客户端的数据请求,然后再放入服务器端的Queue中,等待服务器应用层从Queue中读取处理。图1-8是一个请求信号的路线图,服务器响应信号类似。
但是,现在有一个比较琐碎、好像不起眼的难题需要解决,也就是Java的数据类型问题。客户端应用层产生请求Request时,这些Request可能是字符串,也可能是图片等普通对象Object,放入Queue中后,当再从Queue中取出时,已经很难分辨它们的类型。如何解决这个问题?
在已经确定这些对象的类型后,还要实现数据类型转换工作,将这些请求对象转换为非堵塞I/O可读写的ByteBuffer流,也就是通常讲的序列化。同样的工作也发生在服务器,在服务器端需要将ByteBuffer流反序列化变为原始的请求对象类型,然后放入服务器端的Queue中,当服务器的应用层从Queue中取出这些对象时,又需要进行一次“身份”鉴别。
以上只是请求Request的一个发送处理过程,还有另外一个响应Response的处理发送过程。纵观这两种信号的处理过程,有相似点,也有非相似点。如果不进行巧妙设计,单就每个环节都用比较死板的代码实现,代码就会变得琐碎而复杂,这容易导致问题发生,影响系统的稳定性和拓展性。
如果仔细整理一下这些琐碎、似乎毫不起眼的事情时,会有惊人的发现。
首先,将这个接口层涉及到的几个基本对象和事件整理如下:
- 信号类型:请求信号和响应信号。
- 数据类型:字符串和普通对象,以后可能扩展增加。
- Queue操作:加入Queue和从Queue中取出。
这里一共有6种对象或动作,这6种对象和动作搭配组合的数量是巨大的,例如:一个请求信号可能是字符串,也可能是普通对象,无论是这两种数据类型中的哪一种,都需要加入Queue和从Queue中取出。如果仔细考虑Queue的放入和读取动作,服务器端和客户端各有一个Queue,那么相应动作将有8种实现,每种数据类型都对应有这8种Queue操作实现。但如果以后再拓展新的数据类型,这显然是很琐碎可怕的事情。如果试图“穷尽”这些组合数量,那是一种非常“愚笨”的办法,也永远无法从编程中获得乐趣。而且,这样“穷尽”以后的代码会变得琐碎而复杂,如果有新的数据类型添加,将导致系统愈加复杂。
当事情第一次发生时尽管如实描述它,第二次同样发生时就要引起警觉,第3次同样发生时,就该重新设计或重整(Refactorying)了。
重新仔细研究整个流程,会发现有一些可以简化的环节。其中一个最容易想到的是:将数据类型简化,都以普通对象Object来替代。这只是通过变更需求来达到简化设计的目的,还是没有找到一种可重用或通用的解决方案。何况将字符串以普通对象替代,在对象序列化时,将难以指定变换字符集。
另外可以接受的简化环节是:当Socket的非堵塞I/O从Queue中提取数据对象时,无法知道这些对象的具体数据类型,但是,可以肯定的是,这些对象是供非堵塞I/O使用的,而非堵塞I/O只会使用到可序列化的数据类型。因此,只要这些提取的对象是一种可序列化的数据类型就可以。InputStream和OutputStream是流类型的类,是典型的序列化特征数据类型。那么,如果将Queue中对象都明确为InputStream或OutputStream类,这样将简化图1-8中的A和B两个数据类型转换部分。
下面就如何精简实现A和B部分做进一步讨论,以客户端的UDPClient为例,修改其doKey方法如下,其中messageQueue就是前面介绍的MessageList实例:
private void doKey(SelectionKey key) throws Exception {
DatagramChannel keyChannel = null;
try {
keyChannel = (DatagramChannel) key.channel();
if (key.isReadable()) { //如果可以从服务器读取response数据
Debug.logVerbose("get response from the Server", module);
//从服务器读取Response后放入Queue
byte[] array = new byte[1024];
ByteBuffer buffer = ByteBuffer.wrap(array);
keyChannel.receive(buffer);
InputStream bin = new ByteArrayInputStream(array);
//将ByteArrayInputStream实例放入Queue
messageQueue.pushResponse(bin);
//处理结束
key.interestOps(SelectionKey.OP_WRITE);
selector.wakeup();
} else if (key.isWritable()) { //如果可以向服务器发送request数据
Debug.logVerbose("-->begin to send request", module);
//从Queue中取出ByteArrayOutputStream
ByteArrayOutputStream outByte = (ByteArrayOutputStream) messageQueue
removeReqFirst();
ByteBuffer buffer = ByteBuffer.wrap(outByte.toByteArray());
keyChannel.write(buffer);
key.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
} catch (Exception e) {
Debug.logError("run error:" + e, module);
close(keyChannel);
throw new Exception(e);
}
}
从上面代码看出,客户端Queue中统一放置对象类型都是ByteArrayInputStream和ByteArrayOutputStream,请求信号的Queue中放置的是ByteArrayOutputStream;而响应信号Queue中放置的是ByteArrayInputStream。在服务器端,Queue中也统一为这两种对象类型,以UDPHanlder的读写方法为例,如下:
private void read() throws Exception {
try {
Debug.logVerbose("-- > read request from client", module);
byte[] array = new byte[1024];
ByteBuffer buffer = ByteBuffer.wrap(array);
address = datagramChannel.receive(buffer);
InputStream bin = new ByteArrayInputStream(array);
messageQueue.pushRequest(bin); //放入ByteArrayInputStream
state = SENDING;
key.interestOps(SelectionKey.OP_WRITE);
} catch (Exception ex) {
Debug.logError("readRequest .. error:" + ex, module);
}
}
private void send() throws Exception {
try {
Debug.logVerbose("-- > send response to server", module);
//从Reponse Queue中取出ByteArrayOutputStream
ByteArrayOutputStream bout =
(ByteArrayOutputStream)messageQueue.removeResFirst();
ByteBuffer buffer1 = ByteBuffer.wrap(bout.toByteArray());
datagramChannel.send(buffer1, address);
state = READING;
key.interestOps(SelectionKey.OP_READ);
} catch (Exception ex) {
Debug.logError("readRequest .. error:" + ex, module);
}
}
在Socket发出和接受时,解决了图1-8中A和B的数据转换问题,下一步需要解决图1-8中放入Queue的C处和从Queue中取出的D处的数据转换问题,这两点是应用层操作点。
因为现在已经规定Queue中放置的是ByteArrayInputStream和ByteArrayOutputStream两种数据类型,那么,应用层将数据对象放入Queue中时,必须同时实现将该对象转换成ByteArrayOutputStream类型。同样,取出时,需要将Queue中ByteArrayOutputStream转换成原来的对象类型。
这就需要在C和D处完成一个转换接口,它们对应的所需要转换的对象类型可能是多种的。如String类型或普通Object类型。因为应用层可能是各种应用,包括可能需要传送的视频或声频对象。如何针对这样一个集合中各种对象类型分别实行转换,然后放入Queue中?当然,这里希望找到一种优雅、可以通用的解决方案,而不是使用IF语句穷尽所有对象类型实行繁琐的判断处理。
1.4.3 访问者模式实现
访问者模式提供了一种针对集合中各种元素实现统一访问的解决模式,在本系统中,接口层针对各种可能的应用层设计,面对的是各种可能数据对象。如何访问这些数据对象,然后将它们统一转换成可以放入Queue的ByteArrayOutputStream类型?其他Queue的操作也能采取这种方案统一解决。
访问者模式的使用关键是确定访问者和被访问者的角色。在这里,被访问者是应用层需要放入Queue中的各种数据对象,而访问者访问这些数据对象有两个操作行为:放入Queue中和从Queue中取出。
那么,设计访问者接口如下:
public interface QueueWorker {
final static int REQUEST = 1;
final static int RESPONSE = 2;
final static MessageQueue messageQueue = MessageQueue.getInstance();
public void run(int msgType, Linkable object) throws Exception;
}
访问者是一个Queue操作工,它访问各种数据对象,然后实行对Queue的操作,被访问者是那些数据对象,建立被访问者接口如下:
public interface Linkable {
public void accpet(QueueWorker worker) throws Exception;
public OutputStream getOutputStream();
public void setInputStream(InputStream in);
}
被访问者是一种可实行LinkerList操作的接口,有一个统一接受访问的方法accept。
下面需要具体实现被访问者:
以String类型数据对象为具体元素,代码如下:
public class StringType implements Linkable {
private String content = null;
private int msgType; //是Request信号还是Response信号
private ByteBuffer byteBuffer = null;
public StringType(int msgType) {
this.msgType = msgType;
}
public String getContent() {
return content;
}
public void setContent(String content){
this.content = content;
}
public void accpet(QueueWorker worker) throws Exception {
worker.run(msgType, this);
}
public OutputStream getOutputStream() {
OutputStream outputStream = null;
try {
//将String转换成ByteArrayOutputStream
outputStream = DataTypeHelper.writeString(content);
} catch (Exception ex) {
}
return outputStream;
}
public void setInputStream(InputStream in) {
try {
//将ByteArrayInputStream转换成String
this.content = DataTypeHelper.getString(in);
} catch (Exception ex) {
}
}
}
其他数据类型如Object等可参考String的实现方法。
访问者QueueWorker有两种具体实现,分别是加入Queue的操作工QueueAddWorker和从Queue中取出元素的操作工QueueTakeWorker。QueueAddWorker代码如下:
public class QueueAddWorker implements QueueWorker{
public void run(int msgType, Linkable object) throws Exception {
//获得元素转换后的outputStream对象
OutputStream outputStream = object.getOutputStream();
if (msgType == REQUEST) {//根据不同的消息类型放入不同的Queue中
messageQueue.pushRequest(outputStream);
} else if (msgType == RESPONSE) {
messageQueue.pushResponse(outputStream);
}
}
}
QueueTakeWorker的代码如下:
public class QueueTakeWorker implements QueueWorker{
public void run(int msgType, Linkable object) throws Exception {
InputStream bin = null;
OutputStream outputStream = object.getOutputStream();
if (msgType == REQUEST) {
bin = (InputStream) messageQueue.removeReqFirst();
} else if (msgType == RESPONSE) {
bin = (InputStream) messageQueue.removeResFirst();
}
//将取出的InputStream类型对象实行各自转换
object.setInputStream(bin);
bin.close();
}
}
以上代码见CD程序1-8。
至此,实现访问者模式的数据转换和Queue操作模块基本完成。在这个转换模块中,针对请求Request信号,根据不同的数据类型,转换成统一的类型放入Queue中,然后再从Queue中取出,转换成各自的数据类型;针对Response信号实现了同样的情况,这样在客户端和服务器端都统一调用这个模块,实现了统一的数据类型。
当然,使用访问者模式后,相关的代码可能变多了。如果在这里不使用模式,可能是只有很少的几行代码,但必须进行详细的注解说明。如果应用层使用的是String类型应该怎样做;使用XXX类型应该怎样做,需要明确告知Queue中放置的是InputStream和OutputStream型,相当于在代码这里插入一个“使用说明书”,这样其他程序员扩展新的应用时才能够明白。
相反,如果在这里使用了模式,相应的警示性代码说明就不需要。因为代码质量已经提高,在访问者模式中封装了Queue的类型放置和提取,对于其他程序员再次使用要方便容易得多。
推而广之,在很多情况下,都存在两种实体:事物流程和事物流程的管理。事物流程的管理是对事物流程实现管理监督的,无论再好的事物流程管理,都没有事物流程本身实现自动自律管理来得更加有效。这也是为什么采取软件系统实现很多处理过程自动化的目的,因为软件系统本身具有强迫性,可以迫使很多管理目的能够非常自然地实现。
1.4.4 协议封装
前面架构设计中已经分析过,目前HTTP协议是防火墙友好型协议,可以穿透大多数防火墙而不被拦截,因此数据包发送可以使用HTTP协议进行包装后再发送。
在本系统中,由于应用层可能有不同的数据类型需要实现Socket发送和接受。因此,如果将数据协议包装工作设定在接口层和应用层之间,势必又带来了数据类型问题的困扰,因此明智之举是放在接口层和Socket之间。当Socket底层从接口层的Queue中获取数据对象后,再进行协议包装,然后通过I/O发送,服务器方在Socket I/O接受到数据后,去除协议包装,再将正文内容放入接口层Queue中。
目前是采取HTTP协议,考虑到为其他协议留下扩展的余地,有可能根据不同的应用,要增加新的协议,所以协议和数据的包装过程必须是能够替换或者扩展的。那么现在需要将这个包装过程封装起来,和外界包括Socket I/O部分切断过多的联系。这样在扩展协议时,才不会需要修改Socket I/O部分的代码。
工厂方法模式是封装创建过程的模式。在本例中,协议和数据结合的传送数据包是最终产品,而这个结果过程需要封装。工厂方法模式正好可以解决这个问题。关于工厂模式的更多研究和讨论见以后章节。
建立一个WrapFactory抽象类,专门是实现协议和数据结合包装的类,代码如下(程序1-9):
程序1-9
/**
* 数据包装工厂
* 默认是使用HTTP协议包装数据,也可以拓展成其他协议
* <p>Copyright: Jdon.com Copyright (c) 2003</p>
* <p>Company: 上海解道计算机技术有限公司</p>
* @author banq
* @version 1.0
*/
public abstract class WrapFactory {
private static WrapFactory factory = new com.jdon.jserver.http.HttpWrapFactory();
public static WrapFactory getInstance() {
return factory;
}
public abstract byte[] getRequest(byte[] bytes);
public abstract byte[] getResponse(byte[] bytes);
public abstract byte[] getContentFromRequest(byte[] bytes) throws Exception;
public abstract byte[] getContentFromResponse(byte[] bytes) throws Exception;
}
在这个抽象类中,需要实现具体4个行为:获得包含协议的请求信号数据包;获得包含协议的响应信号数据包;从请求数据包中分离获得正文;从响应数据包中分离获得正文。
无论采取哪个协议,这4个行为都是必须要实现的。该抽象类默认是以HttpWrapFactory为具体实现,HttpWrapFactory是HTTP协议的数据封装实现。代码如下:
public class HttpWrapFactory extends com.jdon.jserver.connector.WrapFactory {
private String httpPOSTHeader = null;
public HttpWrapFactory() {
httpPOSTHeader = HttpHeadHelper.getPOSTHeader("", 0);
}
public byte[] getRequest(byte[] bytes){
return HttpHeadHelper.assembleRequest(bytes);
}
public byte[] getResponse(byte[] bytes){
return HttpHeadHelper.assembleResponse(bytes);
}
public byte[] getContentFromRequest(byte[] bytes) throws Exception{
return HttpHeadHelper.getContent(bytes);
}
public byte[] getContentFromResponse(byte[] bytes) throws Exception{
return HttpHeadHelper.getContent(bytes);
}
}
HttpWrapFactory将每个行为具体落实又委托给帮助类HttpHeadHelper实现,在HttpHeadHelper中具体实现数据正文和HTTP协议头部组合过程,以及从数据包中分离开HTTP协议头部以及数据正文等过程。
那么具体如何使用这个WrapFactory?以UDPClient为例子,将其Socket读写部分的代码改写为如下:
ByteArrayOutputStream outByte =
(ByteArrayOutputStream) messageQueue.removeReqFirst();
//下面是增加的一句
byte[] request = wrapFactory.getRequest(outByte.toByteArray());
ByteBuffer buffer = ByteBuffer.wrap(request);
keyChannel.write(buffer);
原来的代码是直接从Queue中取出后,转为byte数组直接包装进入ByteBuffer发送出去,现在在进入ByteBuffer之前,进行协议封装,使用WrapFactory对象的getRequest方法,获得一个包含HTTP协议头部信息的总数据包,然后再将这个总数据包发送出去。