上一篇博客我们说了Java基础中BIO、NIO、AIO,其中用NIO和AIO开发中需要处理的东西比较多,Netty是IO这方面优秀的框架,我们来看个Netty的入门,以及其解决IO传输中的比较恶心的粘包和拆包问题。
首先我们到http://netty.io/ 去下载netty的jar包,我是通过Maven下载了5.0.0.Alpha2版本,然后我们看下入门实例。注意其中的几个点即可:1,服务端:ServerBootstrap、父子线程组、继承ChannelHandlerAdapter、重写channelRead、channelReadComplete、exceptionCaught;2,客户端:Bootstrap、线程组、继承ChannelHandlerAdapter、重写channelActive、channelRead、exceptionCaught。
实例:
public class TimeServer {
/**
* init一些thread、server等
* @param port
*/
public void bind(int port){
//配置服务端的NIO Reactor线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b =new ServerBootstrap();
//绑定两个线程组,并设置IO事件的处理类,ChildChannelHandler
b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,1024).childHandler(new ChildChannelHandler());
//绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
//等待服务器监听端口关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//优雅退出,释放线程组
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
/**
* 子线程功能处理
*/
private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//指定handler
socketChannel.pipeline().addLast(new TimeServerHandler());
}
}
public static void main(String[] args) {
int port = 8090;
if(args !=null && args.length>0){
try{
port = Integer.valueOf(args[0]);
}catch (NumberFormatException e){
}
}
new TimeServer().bind(port);
}
}
public class TimeServerHandler extends ChannelHandlerAdapter {
/**
* 异常处理
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
/**
* 处理逻辑
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("The time server receive order : " + body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(resp);
}
/**
* 处理完成后操作
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
public class TimeClient {
/**
* connect server
* @param port
* @param host
*/
public void connect(int port , String host){
EventLoopGroup group =new NioEventLoopGroup();
try {
Bootstrap b =new Bootstrap();
b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});
//发起异步连接操作
ChannelFuture f = b.connect(host,port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 8090;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
}
}
new TimeClient().connect(port,"127.0.0.1");
}
}
public class TimeClientHandler extends ChannelHandlerAdapter {
private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());
private final ByteBuf firstMessage;
/**
* init
*/
public TimeClientHandler(){
byte[] req = "QUERY TIME ORDER".getBytes();
firstMessage = Unpooled.buffer(req.length);
firstMessage.writeBytes(req);
}
/**
* some exception appear
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.warning("Unexpected exception from downstream :" + cause.getMessage());
ctx.close();
}
/**
* 1,connect success first do
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(firstMessage);
}
/**
* 2,receive data response to handler
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String (req,"UTF-8");
System.out.println("Now is : " + body);
}
}
好,Demo看完后,我们想TCP粘包和拆包问题,首相TCP流协议不关心业务,但是TCP缓冲区是有固定大小的,由于发送业务的大小和缓冲器的大小不一,就会出现一个完整包被拆成几个包,也可能几个包被合并进行处理,这就是粘包(合并)、拆包(拆分)。下边通过这张图,看下其中的几种情况:
出现的情况:
1,服务端,分两次读取到两个独立的数据包,分别D1和D2,没有粘包和拆包;
2,服务端一次接受两个数据包,D1和D2粘合在一起,被称为TCP粘包;
3,服务端分两次读取到两个数据包,第一次为D1包和D2的部分,第二次为D2的剩余内容,被称为TCP拆包;
4,服务端分两次读取到两个数据包,第一次为D1的部分内容,第二次为D1包的剩余内容和D2的包。
解决思路:
1,消息定长,例如每个报文的大小为固定长度200字节,如果不够空位补空格;
2,在包尾增加回车换行符进行分割,例如FTP协议;
3,将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段;
4,更复杂的应用层协议。
Netty通过什么解决这个问题呢?
1,LineBasedFrameDecoder:处理数据中有“\n”、“\r\n”的位置进行换行,支持配置单行的最大长度,如果连续读取到最大长度还是没有发现换行符,则剖出异常,并忽略之前读到的异常码流。
2,DelimiterBasedFrameDecoder:可以自动完成以分隔符做结束标志的消息的解码。
3,FixedLengthFrameDecoder:可以自动完成对定长消息的解码。
三者的用法类似,我们看一个例子即可:
public class EchoServer {
public void bind(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//注意几种解码器
//socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
//socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(20));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new EchoServerHandler());
}
});
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 8090;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
}
}
new EchoServer().bind(port);
}
}
public class EchoServerHandler extends ChannelHandlerAdapter {
int counter = 0;
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("This is" + ++counter + "times receive client : [" + body + "]");
body += "$_";
ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
ctx.writeAndFlush(echo);
}
}
public class EchoClient {
public void connect(int port, String host) {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//几种编码器
//socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
//socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(20));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 8090;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
}
}
new EchoClient().connect(port, "127.0.0.1");
}
}
public class EchoClientHandler extends ChannelHandlerAdapter {
private int counter;
static final String ECHO_REQ = "HI,LIUJIAHAN,WELCOME TO STUDY NETTY.$_";
public EchoClientHandler() {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 100; i++) {
ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("this is " + ++counter + "times receive server:[" + msg + "]");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
好,这篇是Netty的基础入门,还有粘包和拆包Netty提供的基本解决方案,下边我们会更深入的学习。