文章目录
更多博客内容见个人博客:https://itboyer.github.io
引言
最近研究了Netty的相关技术,用于实施高并发场景下的消息通信,期间搜集了大量资料,围绕着netty的channel连接池的设计,这个稍微有些复杂的主题,做了大量功课,其中牵扯到蛮多技术点,要想在网上找到相关的又相对完整的参考文章,确实不太容易。在此记录一下实现的方案,用于技术沉淀。
首先,阅读本文之前需要具备一些基础知识:
- socket通信和长短连接
- 知道Netty的执行流程和相关API操作
- 理解什么是TCP半包,了解Netty提供的粘包和拆包解码器
在此贴出一些学习过程中遇到的优秀Blog
官方文档
分隔符解码器处理半包问题
netty实战-netty client连接池设计(Netty官方新版本中已经实现了简单的连接池,可以学习连接池的设计思想)
线程模型
首先,整个系统架构的线程模型如下:
同步通信机制
其次我们需要关注单线程内的同步请求和响应
抛出问题:
Q1:如何实现基于Netty的“请求-响应”同步通信机制
Netty提供了异步IO和同步IO的统一实现,但是我们的需求其实和IO的同步异步并无关系。我们的关键是要实现请求-响应这种典型的一问一答交互方式。用于实现微服务之间的调用和返回结果获取,要实现这个需求,需要解决两个问题:
a. 请求和响应的正确匹配。
当服务端返回响应结果的时候,怎么和客户端的请求正确匹配起来呢?解决方式:通过客户端唯一的RequestId,服务端返回的响应中需要包含该RequestId,这样客户端就可以通过RequestId来正确匹配请求响应。
b. 请求线程和响应线程的通信。
因为请求线程会在发出请求后,同步等待服务端的返回。因此,就需要解决,Netty在接受到响应之后,怎么通知请求线程结果。
方案:使用LinkedBlockingQueue阻塞任务队列,使用take()获取相应的返回结果
首先需要对每一个请求标识一个全局唯一的标识,下面贴出核心代码:
NettyChannelPoolHandler.java
@Slf4j
public class ChannelTaskThread implements Callable<String> {
/**
* netty channel池
*/
final NettyClientPool nettyClientPool = NettyClientPool.getInstance();
private String message;
public ChannelTaskThread(String message){
this.message = message;
}
@Override
public String call(){
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS");
//同一个线程使用同一个全局唯一的随机数,保证从同一个池中获取和释放资源,同时使用改随机数作为Key获取返回值,时间戳+6位随机数
long random = Long.valueOf(sdf.format(new Date())) * 1000000 + Math.round(Math.random() * 1000000);
Channel channel = nettyClientPool.getChannel(random);
log.debug("在链接池池中取到的Channel: "+ channel.id());
UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
ByteBuf buffer = allocator.buffer(20);
//使用固定分隔符的半包解码器
String msg = message + DataBusConstant.DELIMITER;
buffer.writeBytes(msg.getBytes());
NettyClientHandler tcpClientHandler = channel.pipeline().get(NettyClientHandler.class);
ChannelId id = channel.id();
log.info("SEND SEQNO[{}] MESSAGE AND CHANNEL id [{}]",random,id);
String serverMsg = tcpClientHandler.sendMessage(buffer, channel);
NettyClientPool.release(channel);
return "请求SEQNO["+random+"] "+ serverMsg;
}
}
NettyClientHandler.java
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 使用阻塞式LinkedBlockingQueue,对响应结果保存
* 用于记录通道响应的结果集
*/
private static final Map<Long, LinkedBlockingQueue<String>> RESULT_MAP = new ConcurrentHashMap<>();
public String sendMessage(ByteBuf message,Channel ch) {
LinkedBlockingQueue<String> linked = new LinkedBlockingQueue<>(1);
//获取channel中存储的全局唯一随机值
Long randomId = ch.attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get();
RESULT_MAP.put(randomId,linked);
ch.writeAndFlush(message);
String res = null;
try {
//设置3分钟的获取超时时间或者使用take()--获取不到返回结果一直阻塞
res = RESULT_MAP.get(randomId).poll(3,TimeUnit.MINUTES);
RESULT_MAP.remove(randomId);
}catch (Exception e){
e.printStackTrace();
}
return res;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg){
String message = null;
if(msg instanceof String){
message = msg.toString();
}else if(msg instanceof ByteBuf){
message = ((ByteBuf)msg).toString(Charset.defaultCharset());
}
//获取channel中存储的全局唯一随机值
Long randomId = ctx.channel().attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get();
log.info(" READ INFO 服务端返回结果:"+ message);
LinkedBlockingQueue<String> linked = RESULT_MAP.get(randomId);
linked.add(message);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx){
boolean active = ctx.channel().isActive();
log.debug("[此时通道状态] {}", active);
}
}
连接池的创建
2. 官方提供的FixedChannelPool支持固定连接的连接池,但是不支持连接池的动态回收
直接贴连接池的创建代码,通道的动态回收结合心跳机制实现:
NettyClientPool.java
@Slf4j
//和Spring整合
//@Order(Integer.MAX_VALUE+1)
//@Component
//@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public class NettyClientPool {
/**
* volatile保持线程之间的可见性,连接池的创建是单例,在这里可加可不加
*/
volatile private static NettyClientPool nettyClientPool;
/**
* key为目标主机的InetSocketAddress对象,value为目标主机对应的连接池
*/
public ChannelPoolMap<InetSocketAddress, FixedChannelPool> poolMap;
final EventLoopGroup group = new NioEventLoopGroup();
final Bootstrap strap = new Bootstrap();
// @Value("${netty.server.addresses}")
private static String addresses = "127.0.0.1:7000";
volatile private static Map<InetSocketAddress,FixedChannelPool> pools = new HashMap<>(4);
volatile private static List<InetSocketAddress> addressList;
private NettyClientPool(){
//如果和Spring整合,构造方法内的build方法调用注掉
build();
}
/**
* 单例
* @return
*/
// @Bean(initMethod = "build")
public static NettyClientPool getInstance(){
if(nettyClientPool == null) {
synchronized (NettyClientPool.class) {
if(nettyClientPool == null) {
nettyClientPool = new NettyClientPool();
}
}
}
return nettyClientPool;
}
public void build(){
log.info("NettyClientPool build......");
strap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true);
poolMap = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>() {
@Override
protected FixedChannelPool newPool(InetSocketAddress key) {
return new FixedChannelPool(strap.remoteAddress(key), new NettyChannelPoolHandler(), DataBusConstant.MAX_CONNECTIONS);
}
};
getInetAddresses(addresses);
for (InetSocketAddress address: addressList){
pools.put(address,poolMap.get(address));
}
}
/**
* <pre>功能描述:
* 根据随机数取出随机的server对应pool,从pool中取出channel
* pool.acquiredChannelCount(); 对应池中的channel数目
* 连接池的动态扩容: 指定最大连接数为{@link Integer.MAX_VALUE},如果连接池队列中取不到channel,会自动创建channel,默认使用FIFO的获取方式,回收的channel优先被再次get到
* SERVER的宕机自动切换: 指定重试次数,get()发生连接异常,则对随机数+1,从下一个池中重新获取,
*
* 后期如有必要可优化为:Server注册到注册中心,从注册中心获取连接池对应的address,或者注册到zookeeper中,都需要单独写实现
*
* </pre>
* @方法名称 getChannel
* @作者 zhangdong
* @创建时间 2019/4/23 11:39
* @param random
* @return io.netty.channel.Channel
*/
public Channel getChannel (long random){
int retry = 0;
Channel channel = null;
try {
//按时间戳取余
Long poolIndex = random % pools.size();
InetSocketAddress address = addressList.get(poolIndex.intValue());
FixedChannelPool pool = pools.get(address);
Future<Channel> future = pool.acquire();
channel = future.get();
AttributeKey<Long> randomID = AttributeKey.valueOf(DataBusConstant.RANDOM_KEY);
channel.attr(randomID).set(random);
//如果是因为服务端挂点,连接失败而获取不到channel,则随机数执行+1操作,从下一个池获取
}catch (ExecutionException e){
log.error(e.getMessage());
//每个池,尝试获取取2次
int count = 2;
if(retry < addressList.size() * count){
retry ++;
return getChannel( ++ random);
}else {
log.error("没有可以获取到channel连接的server,server list [{}]",addressList);
throw new RuntimeException("没有可以获取到channel连接的server");
}
}
catch (InterruptedException e){
e.printStackTrace();
}catch (Exception e){
e.printStackTrace();
}
return channel;
}
/**
* <pre>功能描述:
* 回收channel进池,需要保证随机值和getChannel获取到的随机值是同一个,才能从同一个pool中释放资源
* </pre>
* @方法名称 release
* @作者 zhangdong
* @创建时间 2019/4/23 11:16
* @param ch
* @return void
*/
public static void release(Channel ch){
long random = ch.attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get();
ch.flush();
Long poolIndex = random % pools.size();
pools.get(addressList.get(poolIndex.intValue())).release(ch);
}
/**
* 获取线程池的hash值
* @param ch
* @return
*/
public static int getPoolHash(Channel ch){
long random = ch.attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get();
Long poolIndex = random % pools.size();
return System.identityHashCode(pools.get(addressList.get(poolIndex.intValue())));
}
/**
* <pre>功能描述:
* 获取服务端server列表,每个server对应一个pool
* </pre>
* @方法名称 getInetAddredd
* @作者 zhangdong
* @创建时间 2019/4/23 11:17
* @param addresses
* @return void
*/
public void getInetAddresses(String addresses){
addressList = new ArrayList<>(4);
if(StringUtils.isEmpty(addresses)){
throw new RuntimeException("address列表为空");
}
String[] splits = addresses.split(",");
for (String address: splits){
String[] split = address.split(":");
if(split.length==0){
throw new RuntimeException("[" + address + "]不符合IP:PORT格式");
}
addressList.add(new InetSocketAddress(split[0], Integer.parseInt(split[1])));
}
}
}
NettyChannelPoolHandler.java
@Slf4j
public class NettyChannelPoolHandler implements ChannelPoolHandler {
static final ByteBuf byteBuf = Unpooled.copiedBuffer(DataBusConstant.DELIMITER.getBytes());
@Override
public void channelReleased(Channel ch){
ch.writeAndFlush(Unpooled.EMPTY_BUFFER);
log.info("|-->回收Channel. Channel ID: " + ch.id());
}
@Override
public void channelAcquired(Channel ch){
log.info("|-->获取Channel. Channel ID: " + ch.id());
}
@Override
public void channelCreated(Channel ch){
log.info("|-->创建Channel. Channel ID: " + ch.id()
+"\r\n|-->创建Channel. Channel REAL HASH: " + System.identityHashCode(ch));
SocketChannel channel = (SocketChannel) ch;
channel.config().setKeepAlive(true);
channel.config().setTcpNoDelay(true);
channel.pipeline()
//开启Netty自带的心跳处理器,每5秒发送一次心跳
.addLast(new IdleStateHandler(0, 0, 5,TimeUnit.SECONDS))
.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE,byteBuf))
.addLast(new NettyClientHandler());
}
}
通道的动态回收
3. 心跳机制的实现保证心跳不会失活,丢失心跳包的通道的管理,参考上面的 NettyChannelPoolHandler 处理器
动态通道回收,在 NettyClientHandler 类中实现userEventTriggered方法
volatile static Map<Integer,Set<Channel>> coreChannel = new HashMap();
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
log.info("[客户端心跳监测发送] 通道编号:{}", ctx.channel().id());
Channel channel = ctx.channel();
if (evt instanceof IdleStateEvent) {
//当客户端开始发送心跳检测时。说明没有业务请求过来,释放通道数为设定的 CORE_CONNECTIONS
if(channel.isActive()){
//使用pool的hash值作为Key,维护 CORE_CONNECTIONS个数个通道,多余的关闭
int poolHash = NettyClientPool.getPoolHash(channel);
Set<Channel> channels = coreChannel.get(poolHash);
channels = channels == null ? new HashSet<>(DataBusConstant.CORE_CONNECTIONS) : channels;
channels.add(channel);
if(channels.stream().filter(x-> x.isActive()).count() > DataBusConstant.CORE_CONNECTIONS){
log.info("关闭 CORE_CONNECTIONS 范围之外的通道:{}",channel.id());
channels.remove(channel);
channel.close();
}
coreChannel.put(poolHash,channels);
}
String heartBeat = DataBusConstant.HEART_BEAT + DataBusConstant.DELIMITER;
ByteBuf byteBuf = Unpooled.copiedBuffer(heartBeat.getBytes());
channel.writeAndFlush(byteBuf);
} else {
super.userEventTriggered(ctx, evt);
}
}
辅助类
常量类
public class DataBusConstant {
public static final String DELIMITER = "%#_#%";
public static final String HEART_BEAT = "ping-pong-ping-pong";
/**
* 最大连接数
*/
public static final int MAX_CONNECTIONS = Integer.MAX_VALUE;
/**
* 核心链接数,该数目内的通道 在没有业务请求时发送心跳防止失活,超过部分的通道close掉
*/
public static final int CORE_CONNECTIONS = 1;
/**
* 同一个线程使用同一个全局唯一的随机数,保证从同一个池中获取和释放资源,同时使用改随机数作为Key获取返回值
*/
public static final String RANDOM_KEY = "randomID";
/**
* 服务端丢失心跳次数,达到该次数,则关闭通道,默认3次
*/
public static final int LOOS_HEART_BEAT_COUNT = 3;
}
任务连接池
@Slf4j
public class NettyTaskPool {
/**
* 线程池线程数量,对应CachedThreadPoolExecutor
*/
private static final int CORE_POLL_SIZE = 0;
private static final int MAX_POLL_SIZE = Integer.MAX_VALUE;
//手动创建线程池
private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
CORE_POLL_SIZE,
MAX_POLL_SIZE,
3,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(),
new ThreadPoolExecutor.DiscardOldestPolicy());
public static String submitTask(String message) throws Exception{
//单个任务在线程池内分配单个线程,用于同步等待封装的返回结果
Future<String> submit = threadPool.submit(new ChannelTaskThread(message));
String response = submit.get();
log.info("\n\t submitTask 返回的 Response: \r\n\t\t[ "+ response +" ]\n");
return response;
}
}
测试类
public class TestPool {
public static void main(String[] args) throws Exception{
for (int i = 0; i < 5; i++) {
//模拟多线程客户端,提交任务,
new Thread(()-> {
try {
for (int j = 0; j < 10; j++) {
String longMsgBody = j + "中华人民共和国,中华人民共和国,中华人民共和国,中华人民共和国,中华人民共和国" + j;
NettyTaskPool.submitTask(longMsgBody);
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
server端省略