转:https://blog.csdn.net/qq_18603599/article/details/80768402
上一章节,介绍了netty的服务端和客户端初始化过程并且最后还使用了一个代码实现了netty的入门编程,那么从本章开始要陆续介绍和netty编程中使用到的相关知识,今天要介绍的知识点如下
1 Netty的tcp的粘包
2 Netty的tcp的拆包
3 Netty的tcp的粘包,拆包结局方案
OK 接下来开始一个一个来说,首先对于一个正常的TCP发送数据和接受数据会产生以下几种情况...
第一种情况,接收端正常收到两个数据包,即没有发生拆包和粘包的现象
第二中情况,接收端只收到一个数据包,由于TCP是不会出现丢包的,所以这一个数据包中包含了发送端发送的两个数据包的信息,这种现象即为粘包
第三种情况,接收端收到了两个数据包,但是这两个数据包要么是不完整的,要么就是多出来一块,这种情况即发生了拆包和粘包
粘包原因:
>要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包:发送数据小于缓冲区
>接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包:读取数据超时
拆包原因:
>要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包:发送数据大于缓冲区大小
>待发送数据大于最大报文长度也会发生拆包:发送数据大于报文长度:数据大于报文长度
总结:不管是拆包,还是粘包都是在发送数据,数据缓冲区,以及报文长度这三者之间满足某种关系而发生的一种现象
解决方案:
固定数据包长度:发送端把数据包封装成固定长度,连接端每次从接收缓冲区读取固定长度数据包
添加长度首部:给每个数据包添加包首部,首部中应该至少包含数据包的长度,接收端在接收到数据后,通过读取包首部的长度字段设置边界:在数据包之间设置边界,如添加特殊符号,这样,接收端通过这个边界就可以将不同的数据包拆分开
Netty中的实现方案:
其实在netty已经提供了几种默认实现好的方案,开发者只需要了解每一种是什么作用,然后根据业务需求选择对应的实现即可,下面列举出默认的各种实现类和对应的功能
DelimiterBasedFrameDecoder:分隔符解码器
LineBasedFrameDecoder:回车换行解码器
FixedLengthFrameDecoder:固定长度解码器
LengthFieldBasedFrameDecoder:用于标识消息体或者整包消息的长度>解决'读半包'
上面说的基本上都是理论,接下来进入实践环节,接下来通过代码演示一下什么叫拆包和粘包,首先演示'粘包'例子很简单,就是客户端连接上服务端之后发送一句话'烧烤小分队',客户端循环重复发送,服务端接受并累加次数.看代码,为了方便演示,把和客户端相关的类都放在客户端,服务端都放在服务端,具体请看代码,首先看客户端的代码
package client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.logging.Logger;
/**
* 模拟发送数据的客户端
*/
public class CounterClient {
public void connect(String host,int port)throws Exception{
// 配置服务端的NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
// Bootstrap 类,是启动NIO服务器的辅助启动类
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception{
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new CounterClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f= b.connect(host,port).sync();
// 等待客服端链路关闭
f.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
public static void main(String[]args)throws Exception{
int port = 8080;
if(args!=null && args.length>0){
try {
port = Integer.valueOf(args[0]);
}
catch (NumberFormatException ex){}
}
new CounterClient().connect("127.0.0.1",port);
}
/**
* 客户端发送数据的handler
*/
class CounterClientHandler extends ChannelInboundHandlerAdapter {
//统计频率
private int counter;
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception{
String body = (String)msg;
System.out.println("当前时间是 : " + body+";当前频率是 : "+ ++counter);
}
/**
* 客户端连接上服务端之后会调用该方法
* @param ctx
*/
public void channelActive(ChannelHandlerContext ctx){
ByteBuf firstMessage=null;
String value = "烧烤小分队";
String data = value;
for (int i=0;i<3;i++){
firstMessage = Unpooled.buffer(data.getBytes().length);
firstMessage.writeBytes(data.getBytes());
ctx.writeAndFlush(firstMessage);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
System.out.println(("message from:"+cause.getMessage()));
ctx.close();
}
}
}
服务端的代码:
package server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Date;
/**
* 模拟词频统计的服务端
*/
public class CounterServer {
public void bind(int port)throws Exception{
// 网络读写
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup WorkerGroup = new NioEventLoopGroup();
try {
// ServerBootstrap 类,是启动NIO服务器的辅助启动类
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,WorkerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChildChannelHandler());
// 绑定端口,同步等待成功
ChannelFuture f= b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
}finally {
// 释放线程池资源
bossGroup.shutdownGracefully();
WorkerGroup.shutdownGracefully();
}
}
/**
* 初始化channel的handler
*/
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch)throws Exception{
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new CounterServerHandler());
}
}
/**
* 法务端业务处理的handler
*/
public class CounterServerHandler extends ChannelInboundHandlerAdapter {
private int counter;
// 用于网络的读写操作
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception{
String body = (String)msg;
System.out.println("法务端接受到的数据 : " + body+";累加频率是:"+ (++counter));
String currentTime = "烧烤小分队".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString():"数据接收不正确";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
ctx.close();
}
}
public static void main(String[]args)throws Exception{
int port = 8080;
if(args!=null && args.length>0){
try {
port = Integer.valueOf(args[0]);
}
catch (NumberFormatException ex){}
}
new CounterServer().bind(port);
}
}
代码很简单,主要看一下效果,先启动服务端,再启动客户端,最终效果如下
OK 问题出现了就想解决办法,刚刚说了netty已经提供了默认了解决实现方案,接下来就修改一下代码,解决思路有很多种,这里使用换行符来搞定,就是每发送一句话,会在后面加上一个换行符,这样服务端就会根据换行符来解析数据,获取的数据结果自然就是正确的。
看修改之后的客户端代码,其实很多和上一个代码是一致的,在这里为了更好的区分,又重新建立一个类,方便看对比效果
package client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* 模拟发送数据的客户端
*/
public class CounterResovlerPacketClient {
public void connect(String host,int port)throws Exception{
// 配置服务端的NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
// Bootstrap 类,是启动NIO服务器的辅助启动类
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception{
//增加换行解码器
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new CounterClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f= b.connect(host,port).sync();
// 等待客服端链路关闭
f.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
public static void main(String[]args)throws Exception{
int port = 8080;
if(args!=null && args.length>0){
try {
port = Integer.valueOf(args[0]);
}
catch (NumberFormatException ex){}
}
new CounterResovlerPacketClient().connect("127.0.0.1",port);
}
/**
* 客户端发送数据的handler
*/
class CounterClientHandler extends ChannelInboundHandlerAdapter {
//统计频率
private int counter;
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception{
String body = (String)msg;
System.out.println("客户端接受服务度返回的数据,当前时间是 : " + body+";当前频率是 : "+ ++counter);
}
/**
* 客户端连接上服务端之后会调用该方法
* @param ctx
*/
public void channelActive(ChannelHandlerContext ctx){
ByteBuf firstMessage=null;
String value = "烧烤小分队带你走上IT巅峰";
//这里加了一个换行符,为了让服务端进行按行读取数据的依据
String data = value + "\n";
for (int i=0;i<10;i++){
firstMessage = Unpooled.buffer(data.getBytes().length);
firstMessage.writeBytes(data.getBytes());
ctx.writeAndFlush(firstMessage);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
System.out.println(("message from:"+cause.getMessage()));
ctx.close();
}
}
}
再看一下服务端的代码:
package server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Date;
/**
* 模拟词频统计的服务端
*/
public class CounterResovlerPacketServer {
public void bind(int port)throws Exception{
// 网络读写
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup WorkerGroup = new NioEventLoopGroup();
try {
// ServerBootstrap 类,是启动NIO服务器的辅助启动类
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,WorkerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChildChannelHandler());
// 绑定端口,同步等待成功
ChannelFuture f= b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
}finally {
// 释放线程池资源
bossGroup.shutdownGracefully();
WorkerGroup.shutdownGracefully();
}
}
/**
* 初始化channel的handler
*/
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch)throws Exception{
//增加换行符解码器
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new CounterServerHandler());
}
}
/**
* 法务端业务处理的handler
*/
public class CounterServerHandler extends ChannelInboundHandlerAdapter {
private int counter;
// 用于网络的读写操作
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception{
String body = (String)msg;
System.out.println("服务端接受到的数据 : " + body+";累加频率是:"+ (++counter));
String currentTime = "烧烤小分队带你走上IT巅峰".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString():"错误数据";
currentTime +=System.getProperty("line.separator");//按换行符进行数据的读取
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
ctx.close();
}
}
public static void main(String[]args)throws Exception{
int port = 8080;
if(args!=null && args.length>0){
try {
port = Integer.valueOf(args[0]);
}
catch (NumberFormatException ex){}
}
new CounterResovlerPacketServer().bind(port);
}
}
这时候重复模拟发送10篇数据,再看一下效果:
客户端的效果:
注意:LineBaseFrameDecoder+StringDecoder组合,设置的顺序一定要LineBaseFrameDecoder在StringDecoder前面,否则这个解码器没有效果,一定要留心
这时候发现频率统计是正确的,且数据也不会再粘在一起,这样就解决了'粘包'的问题,当然这这是一种方式,还可以选择其他实现方式,因为原理都差不多,这里就不多说了,接下来再看另一种问题,就是'拆包',其实要实现这个效果很简单就是把LineBaseFrameDecoder(1024)可以该小点,然后增加要发送字符串的长度,就会自然而然出现拆包现象,然而解决方式可以通过发送固定长度的字符串,然后使用FixedLengthFrameDecoder来解析即可,看个简单的例子 比如客户端发送'this is hello word please come to me',这个时候如果使用FixedLengthFrameDecoder且设置长度为10,并发生了'拆包',效果如下:
很显然把一句完整的话给拆分了,那么如果不想拆分,只需要把长度设置和发送字符串的长度一样即可:简单通过代码演示一下:
客户端代码:
package client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import java.net.InetSocketAddress;
/**
* Created jhp
*/
public class SplitPacketClient {
private final String host;
private final int port;//定义服务器端监听的端口
/** 构造函数中传入参数 **/
public SplitPacketClient(String host, int port){
this.host = host;
this.port = port;
}
/** 启动服务器 **/
public void start() throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
//创建一个client 的bootstrap实例
Bootstrap clientBootstrap = new Bootstrap();
try {
clientBootstrap.group(group)
.channel(NioSocketChannel.class)//指定使用一个NIO传输Channel
.remoteAddress(new InetSocketAddress(host, port))//设置远端服务器的host和端口
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
//在channel的ChannelPipeline中加入EchoClientHandler到最后
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new FixedLengthFrameDecoder(40));//只要发送的字符串等于这个长度就不会发送拆包
channel.pipeline().addLast(new StringDecoder());
channel.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = clientBootstrap.connect().sync();//连接到远端,一直等到连接完成
f.channel().closeFuture().sync();//一直阻塞到channel关闭
} finally {
group.shutdownGracefully().sync();//关闭group,释放所有的资源
}
}
/**
* main
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
new SplitPacketClient("127.0.0.1", 8000).start();
}
class EchoClientHandler extends SimpleChannelInboundHandler<String> {
private int counter=0;
private static final String REQ = "this is hello word please come to me";
/**
* 当收到连接成功的通知,发送一条消息.
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i=0; i<10; i++){
ctx.writeAndFlush( Unpooled.copiedBuffer(REQ.getBytes()) );
}
}
/**
* 每当收到数据时这个方法会被调用.打印收到的消息日志
* @param channelHandlerContext
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
System.out.println("client received: " + "counter:" + (++counter) + " msg:"+msg);
}
/**
* 异常发生时,记录错误日志,关闭channel
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();//打印堆栈的错误日志
ctx.close();
}
}
}
服务端代码:
package server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
/**
* Create jhp
*/
public class SplitPacketServer {
private final int port;//定义服务器端监听的端口
/** 构造函数中传入参数 **/
public SplitPacketServer(int port){
this.port = port;
}
/** 启动服务器 **/
public void start() throws Exception{
//县城组
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
//创建一个serverbootstrap实例
ServerBootstrap serverBootstrap = new ServerBootstrap();
try {
serverBootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)//指定使用一个NIO传输Channel
.option(ChannelOption.SO_BACKLOG, 100)
.childHandler(new ChannelInitializer<SocketChannel>() {
//在channel的ChannelPipeline中加入EchoServerHandler到最后
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new FixedLengthFrameDecoder(40));
channel.pipeline().addLast(new StringDecoder());
channel.pipeline().addLast(new EchoServerHandler());
}
});
//异步的绑定服务器,sync()一直等到绑定完成.
ChannelFuture future = serverBootstrap.bind(this.port).sync();
System.out.println(SplitPacketServer.class.getName()+" started and listen on '"+ future.channel().localAddress());
future.channel().closeFuture().sync();//获得这个channel的CloseFuture,阻塞当前线程直到关闭操作完成
} finally {
boss.shutdownGracefully().sync();//关闭group,释放所有的资源
worker.shutdownGracefully().sync();//关闭group,释放所有的资源
}
}
/**
* main
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
new SplitPacketServer(8000).start();
}
class EchoServerHandler extends ChannelInboundHandlerAdapter {
private int counter=0;
/**
* 每次收到消息的时候被调用;
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String)msg;
System.out.println("this is:"+ (++counter) +" time." + " Server received: " + body);
ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
ctx.writeAndFlush(echo);
}
/**
* 在读操作异常被抛出时被调用
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();//打印异常的堆栈跟踪信息
ctx.close();//关闭这个channel
}
}
}
看一下最终的效果:
完全正确了,OK 到此为止粘包和拆包有关知识就讲完了。。