转:https://blog.csdn.net/qq_18603599/article/details/80810566
前面几个章节算是从头到尾把和netty的相关知识进行了详细的介绍,也做了相关的总结,但是关于netty如何在实践中去使用还没有提及到,今天就谈一下这方面的内容,netty大家都知道被适用于网络通信这块,所以很多RPC框架都有他的身影,而实际中使用netty都需要用到的是'长连接技术',所谓长连接技术,就是客户端和服务端要一直保持联系,而不是发送完请求,就断开了,而之前演示的例子都是基于netty的'短连接',所以如果要在生成环境真正使用netty的话,就必须要掌握如何实现基于netty的长连接.其实如果熟悉dubbo源码的人,就会发现dubbo底层的通信是支持netty的,所以他也很好的实现Netty的长连接,主要是在DefaultFuture这个类里面,下面的代码也是基于这个类的思想来实现的,OK 首先分析一下实现大概思路
1 每个客户端的请求创建一个唯一的标志且是自增的,这个可以使用原子技术
2 请求和响应与自身上下文进行绑定
3 设置处理请求超时的处理线程
4 检查请求超时的请求机制
整体项目代码结构
下面详细介绍代码,实现过程:首先定义两个请求的保存实体,分别是客户端请求和服务端请求(如果不分开的话,可能会出现重复或者请求标志对不上):
package com.netty.bean;
import java.util.concurrent.atomic.AtomicLong;
/**
* Created by jack on 2018/5/5.
* 封装客户端的请求
*/
public class ClientRequest {
//请求命令
private String command="test";
//请求参数
private Object content;
private final long id;
//使用原子技术
private static final AtomicLong al = new AtomicLong(0);
public ClientRequest(){
//请求唯一标识id 每次都会自增加1
id = al.incrementAndGet();
}
public String getCommand() {
return command;
}
public void setCommand(String command) {
this.command = command;
}
public Object getContent() {
return content;
}
public void setContent(Object content) {
this.content = content;
}
public long getId() {
return id;
}
}
封装服务端的请求:
package com.netty.bean;
/**
* Created by jack on 2018/5/5.
* 封装服务端的请求
*/
public class ServerRequest {
private String command;
private Object content;
private long id;
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public Object getContent() {
return content;
}
public void setContent(Object content) {
this.content = content;
}
public String getCommand() {
return command;
}
public void setCommand(String command) {
this.command = command;
}
@Override
public String toString() {
System.out.println("command:"+command+","+"id:"+id+","+"content:"+content);
return super.toString();
}
}
有请求必然有响应:
package com.netty.bean;
/**
* Created by jack on 2018/5/5.
* 封装响应
*/
public class Response {
private long id;//请求ID
private int status;//响应状态
private Object content;//响应内容
private String msg;//请求返回信息
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public Object getContent() {
return content;
}
public void setContent(Object content) {
this.content = content;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
接下来这个类很重要,它就是用来处理请求和响应之间的绑定关系:有三个核心功能
& 根据请求ID获取请求对应的响应结果
& 存储客户端请求对应的响应结果信息
& 设置后台线程处理部分客户端的超时请求
接下来看完整的代码:
package com.netty.bean;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by jack on 2018/5/5.
* 管理请求和响应的关系
* 主要是通过唯一的请求标识id
*/
public class DefaultFuture {
//请求id
private long id;
//请求id对应的响应结果
private volatile Response response;
//存储响应结果和自身绑定在一起
public final static Map<Long,DefaultFuture> FUTURES= new ConcurrentHashMap<Long,DefaultFuture>();
//超时时间
private long timeout;
private final long start=System.currentTimeMillis();
//获取锁
private volatile Lock lock = new ReentrantLock();
//线程通知条件
private volatile Condition condition = lock.newCondition();
public DefaultFuture(ClientRequest request){
id=request.getId();//获取对应的请求ID
FUTURES.put(id, this);//存储当前的请求ID对应的上下文信息
}
/**
* 根据请求id获取响应结果
* @param timeout
* @return
*/
public Response get(long timeout){
long start = System.currentTimeMillis();
lock.lock();//先锁
while(!hasDone()){
try {
condition.await(timeout, TimeUnit.SECONDS);
if(System.currentTimeMillis()-start>=timeout){
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();//释放锁
}
}
return response;
}
/**
* 存储服务器端的响应
* @param res
*/
public static void recive(Response res){
//找到res相对应的DefaultFuture
DefaultFuture future = FUTURES.remove(res.getId());
if(future==null){
return ;
}
Lock lock= future.getLock();
lock.lock();
try{
//设置响应
future.setResponse(res);
Condition condition = future.getCondition();
if(condition!=null){
//通知
condition.signal();
}
}catch(Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}
}
private boolean hasDone() {
return response !=null? true:false;
}
public long getId() {
return id;
}
public void setResponse(Response response) {
this.response = response;
}
public Lock getLock() {
return lock;
}
public Condition getCondition() {
return condition;
}
public long getTimeout() {
return timeout;
}
public long getStart() {
return start;
}
/**
* 处理请求超时的线程
*/
static class FutureTimeOutThread extends Thread{
@Override
public void run() {
while(true){
for(long futureId : FUTURES.keySet()){
DefaultFuture f = FUTURES.get(futureId);
if(f==null){
FUTURES.remove(futureId);//为空的话 代表请求结果已经处理完毕了
continue;
}
if(f.getTimeout()>0){
if((System.currentTimeMillis()-f.getStart())>f.getTimeout()){
Response res = new Response();
res.setContent(null);
res.setMsg("请求超时!");
res.setStatus(1);//响应异常处理
res.setId(f.getId());
DefaultFuture.recive(res);//存储服务端的响应结果信息
}
}
}
}
}
}
/**
* 设置为后台线程
*/
static{
FutureTimeOutThread timeOutThread = new FutureTimeOutThread();
timeOutThread.setDaemon(true);
timeOutThread.start();
}
}
OK 接下来看客户端的业务线程:
这里和以前我们演示相关功能的时候不太一样,之前发送数据都是通过channelActive这个方法来发送数据,但实际生产环境中不是这样使用的,都是通过在客户端直接发送数据并且获取到响应结果:所以接下来的handler功能更加简单
& 读取服务端返回来的数据,并且转换成对应的response
& 把该response和请求响应上下文绑定在一起
package com.netty.handler.tcp;
import com.alibaba.fastjson.JSONObject;
import com.netty.bean.DefaultFuture;
import com.netty.bean.Response;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
/**
* Created by jack on 2018/5/5.
*/
public class TcpClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//判断服务端和客户端是在能够正常通信的情况下
if(msg.toString().equals("ping")){
ctx.channel().writeAndFlush("ping\r\n");
return ;
}
System.out.println("客户端获取到服务端响应数据:"+msg.toString());
String str = getJSONObject(msg.toString()).toString();
//读取服务端的响应结果
Response res = JSONObject.parseObject(str, Response.class);
//存储响应结果
DefaultFuture.recive(res);
}
private JSONObject getJSONObject(String str){
JSONObject json = JSONObject.parseObject(str);
json.remove("content");
json.put("msg","保存用户信息成功");
return json;
}
}
初始化客户端的channel:
package com.netty.initialzer.tcp;
import com.netty.handler.tcp.TcpClientHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* Created by jack on 2018/5/5.
*/
public class TcpClientInitalizer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//按照\r\n进行解码
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter()[0]));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TcpClientHandler());
ch.pipeline().addLast(new StringEncoder());
}
}
客户端通过多线程模拟发送并发请求:
package com.netty.client.tcp;
import com.alibaba.fastjson.JSONObject;
import com.netty.bean.ClientRequest;
import com.netty.bean.DefaultFuture;
import com.netty.bean.Response;
import com.netty.bean.User;
import com.netty.initialzer.tcp.TcpClientInitalizer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.Random;
/**
* Created by jack on 2018/5/5.
*/
public class TcpNettyClient {
static EventLoopGroup group =null;
static Bootstrap client =null;
public static ChannelFuture future=null;
static {
group = new NioEventLoopGroup();
client = new Bootstrap();
client.group(group);
client.channel(NioSocketChannel.class);
client.option(ChannelOption.SO_KEEPALIVE,true);
client.handler(new TcpClientInitalizer());
try {
future = client.connect("localhost", 8080).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//发送数据的方法
public static Object send(ClientRequest request){
try{
System.out.println("客户端向服务端发送请求数据:"+JSONObject.toJSONString(request));
//客户端直接发送请求数据到服务端
future.channel().writeAndFlush(JSONObject.toJSONString(request));
//根据\r\n进行换行
future.channel().writeAndFlush("\r\n");
//通过请求实例化请求和响应之间的关系
DefaultFuture defaultFuture = new DefaultFuture(request);
//通过请求ID,获取对应的响应处理结果
Response response = defaultFuture.get(10);
return response;
}catch(Exception e){
e.printStackTrace();
}
return null;
}
public static void main(String[] args) {
for(int i=0;i<20;i++){
new Thread(new UserRequestThread(i)).start();//模拟多线程并发请求
}
}
/**
* 模拟用户并发请求
*/
static class UserRequestThread implements Runnable{
private int requestId;
public UserRequestThread(int requestId){
this.requestId = requestId;
}
public void run() {
synchronized (UserRequestThread.class){
ClientRequest request = new ClientRequest();
request.setCommand("saveUser");
User user = new User();
user.setAge(new Random().nextInt(4)*requestId);
user.setId(requestId);
user.setName("jiahp"+requestId);
request.setContent(user);
//拿到请求的结果
Object result = TcpNettyClient.send(request);
System.out.println("客户端长连接测试返回结果:"+JSONObject.toJSONString(result));
System.out.println(" ");
}
}
}
}
看一下服务端的handler:
& 获取客户端的request
& 解析完成成之后实例化对应的response
& 写入reponse到客户端并加上\r\n
package com.netty.handler.tcp;
import com.alibaba.fastjson.JSONObject;
import com.netty.Media;
import com.netty.bean.Response;
import com.netty.bean.ServerRequest;
import com.netty.bean.User;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import java.nio.charset.Charset;
/**
* Created by jack on 2018/5/5.
*/
public class TcpServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof ByteBuf){
ByteBuf req = (ByteBuf)msg;
String content = req.toString(Charset.defaultCharset());
System.out.println("服务端开始读取客户端的请求数据:"+content);
//获取客户端的请求信息
ServerRequest request = JSONObject.parseObject(content,ServerRequest.class);
JSONObject user = (JSONObject) request.getContent();
user.put("success","ok");
//写入解析请求之后结果对应的响应信息
Response res = new Response();
res.setId(request.getId());
res.setContent(user);
//先写入
ctx.channel().write(JSONObject.toJSONString(res));
//再一起刷新
ctx.channel().writeAndFlush("\r\n");
System.out.println(" ");
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if(evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
if(event.equals(IdleState.READER_IDLE)){
System.out.println("读空闲====");
ctx.close();
}else if(event.equals(IdleState.WRITER_IDLE)){
System.out.println("写空闲====");
}else if(event.equals(IdleState.WRITER_IDLE)){
System.out.println("读写空闲====");
ctx.channel().writeAndFlush("ping\r\n");
}
}
super.userEventTriggered(ctx, evt);
}
}
对应的初始化channel:这里和之前演示的基本功能不一样,因为客户端和服务端要一致保持联系,所以需要增加一个客户端和服务端之间的心跳检查机制handler,netty中已经为我们实现好了,可以直接拿过来使用:
package com.netty.initialzer.tcp;
import com.netty.handler.tcp.TcpServerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
/**
* Created by jack on 2018/5/5.
*/
public class TcpServerInitalizer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter()[0]));
//添加客户端和服务端之间的心跳检查状态
ch.pipeline().addLast(new IdleStateHandler(6, 2, 1, TimeUnit.SECONDS));
ch.pipeline().addLast(new TcpServerHandler());
ch.pipeline().addLast(new StringEncoder());
}
}
最后服务端代码:
package com.netty.server.tcp;
import com.netty.initialzer.tcp.TcpServerInitalizer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
/**
* Created by jack on 2018/5/5.
*/
public class TcpNettyServer {
static EventLoopGroup bossLoopGroup;
static EventLoopGroup workLoopGroup;
static ServerBootstrap server;
static {
bossLoopGroup = new NioEventLoopGroup();
workLoopGroup = new NioEventLoopGroup();
server = new ServerBootstrap();
server.group(bossLoopGroup,workLoopGroup);
server.channel(NioServerSocketChannel.class);
server.option(ChannelOption.SO_BACKLOG,128);
server.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
server.option(ChannelOption.SO_KEEPALIVE,true);
//注意服务端这里一定要用childHandler 不能用handler 否则会报错
server.childHandler(new TcpServerInitalizer());
}
public static void run(int port){
try {
ChannelFuture future = server.bind(new InetSocketAddress(port)).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossLoopGroup.shutdownGracefully();
workLoopGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
run(8080);
}
}
OK 整体代码就分析完了,看一下效果:
服务端接受到客户端的请求,因为是多线程的,所以ID是乱序的
客户端获取到服务端的响应结果:
当然服务端和客户端的连接仍然是保持连接状态的;
这样就实现了基于netty的长连接功能,稍加功能就可以在生成环境中使用.