更多博客内容见个人博客:https://itboyer.github.io

引言

最近研究了Netty的相关技术,用于实施高并发场景下的消息通信,期间搜集了大量资料,围绕着netty的channel连接池的设计,这个稍微有些复杂的主题,做了大量功课,其中牵扯到蛮多技术点,要想在网上找到相关的又相对完整的参考文章,确实不太容易。在此记录一下实现的方案,用于技术沉淀。

首先,阅读本文之前需要具备一些基础知识:

  1. socket通信和长短连接
  2. 知道Netty的执行流程和相关API操作
  3. 理解什么是TCP半包,了解Netty提供的粘包和拆包解码器

在此贴出一些学习过程中遇到的优秀Blog
官方文档
分隔符解码器处理半包问题
netty实战-netty client连接池设计(Netty官方新版本中已经实现了简单的连接池,可以学习连接池的设计思想)

线程模型

首先,整个系统架构的线程模型如下:

Netty Client实战——高并发连接池方案

同步通信机制

其次我们需要关注单线程内的同步请求和响应
抛出问题:
Q1:如何实现基于Netty的“请求-响应”同步通信机制

Netty提供了异步IO和同步IO的统一实现,但是我们的需求其实和IO的同步异步并无关系。我们的关键是要实现请求-响应这种典型的一问一答交互方式。用于实现微服务之间的调用和返回结果获取,要实现这个需求,需要解决两个问题:
a. 请求和响应的正确匹配。
当服务端返回响应结果的时候,怎么和客户端的请求正确匹配起来呢?解决方式:通过客户端唯一的RequestId,服务端返回的响应中需要包含该RequestId,这样客户端就可以通过RequestId来正确匹配请求响应。
b. 请求线程和响应线程的通信。
因为请求线程会在发出请求后,同步等待服务端的返回。因此,就需要解决,Netty在接受到响应之后,怎么通知请求线程结果。

方案:使用LinkedBlockingQueue阻塞任务队列,使用take()获取相应的返回结果
首先需要对每一个请求标识一个全局唯一的标识,下面贴出核心代码:

NettyChannelPoolHandler.java

@Slf4j
public class ChannelTaskThread implements Callable<String> {


    /**
     * netty channel池
     */
    final NettyClientPool nettyClientPool = NettyClientPool.getInstance();

    private String message;

    public ChannelTaskThread(String message){
        this.message = message;
    }

    @Override
    public String call(){
         SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS");
        //同一个线程使用同一个全局唯一的随机数,保证从同一个池中获取和释放资源,同时使用改随机数作为Key获取返回值,时间戳+6位随机数
        long random = Long.valueOf(sdf.format(new Date())) * 1000000 + Math.round(Math.random() * 1000000);

        Channel channel = nettyClientPool.getChannel(random);
        log.debug("在链接池池中取到的Channel: "+ channel.id());
        UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
        ByteBuf buffer = allocator.buffer(20);
			//使用固定分隔符的半包解码器
        String msg = message + DataBusConstant.DELIMITER;
        buffer.writeBytes(msg.getBytes());
        NettyClientHandler tcpClientHandler = channel.pipeline().get(NettyClientHandler.class);
        ChannelId id = channel.id();
        log.info("SEND SEQNO[{}] MESSAGE AND CHANNEL id [{}]",random,id);

        String serverMsg = tcpClientHandler.sendMessage(buffer, channel);
        NettyClientPool.release(channel);
        return "请求SEQNO["+random+"] "+ serverMsg;
    }
}

NettyClientHandler.java

@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 使用阻塞式LinkedBlockingQueue,对响应结果保存
     * 用于记录通道响应的结果集
     */
    private static final Map<Long, LinkedBlockingQueue<String>> RESULT_MAP = new ConcurrentHashMap<>();

    public String sendMessage(ByteBuf message,Channel ch) {
        LinkedBlockingQueue<String> linked = new LinkedBlockingQueue<>(1);
        //获取channel中存储的全局唯一随机值
        Long randomId = ch.attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get();
        RESULT_MAP.put(randomId,linked);
        ch.writeAndFlush(message);
        String res = null;
        try {
            //设置3分钟的获取超时时间或者使用take()--获取不到返回结果一直阻塞
            res = RESULT_MAP.get(randomId).poll(3,TimeUnit.MINUTES);
            RESULT_MAP.remove(randomId);
        }catch (Exception e){
            e.printStackTrace();
        }
        return res;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg){
        String message = null;
        if(msg instanceof String){
            message = msg.toString();
        }else if(msg instanceof ByteBuf){
            message = ((ByteBuf)msg).toString(Charset.defaultCharset());
        }
        //获取channel中存储的全局唯一随机值
        Long randomId = ctx.channel().attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get();
        log.info(" READ INFO 服务端返回结果:"+ message);
        LinkedBlockingQueue<String> linked = RESULT_MAP.get(randomId);
        linked.add(message);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx){
        boolean active = ctx.channel().isActive();
        log.debug("[此时通道状态] {}", active);
    }

}

连接池的创建

2. 官方提供的FixedChannelPool支持固定连接的连接池,但是不支持连接池的动态回收
直接贴连接池的创建代码,通道的动态回收结合心跳机制实现:

NettyClientPool.java

@Slf4j
//和Spring整合
//@Order(Integer.MAX_VALUE+1)
//@Component
//@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public class NettyClientPool {

    /**
     * volatile保持线程之间的可见性,连接池的创建是单例,在这里可加可不加
     */
    volatile private static NettyClientPool nettyClientPool;
    /**
     * key为目标主机的InetSocketAddress对象,value为目标主机对应的连接池
     */
    public ChannelPoolMap<InetSocketAddress, FixedChannelPool> poolMap;

    final EventLoopGroup group = new NioEventLoopGroup();
    final Bootstrap strap = new Bootstrap();

//    @Value("${netty.server.addresses}")
    private static String addresses = "127.0.0.1:7000";

    volatile private static Map<InetSocketAddress,FixedChannelPool> pools = new HashMap<>(4);
    volatile private static List<InetSocketAddress> addressList;

    private NettyClientPool(){
        //如果和Spring整合,构造方法内的build方法调用注掉
        build();
    }

    /**
     * 单例
     * @return
     */
    // @Bean(initMethod = "build")
    public static NettyClientPool getInstance(){
        if(nettyClientPool == null) {
            synchronized (NettyClientPool.class) {
                if(nettyClientPool == null) {
                    nettyClientPool = new NettyClientPool();
                }
            }
        }
        return nettyClientPool;
    }

    public void build(){
        log.info("NettyClientPool build......");
        strap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, true);

        poolMap = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>() {
            @Override
            protected FixedChannelPool newPool(InetSocketAddress key) {
                return new FixedChannelPool(strap.remoteAddress(key), new NettyChannelPoolHandler(), DataBusConstant.MAX_CONNECTIONS);
            }
        };
        getInetAddresses(addresses);

        for (InetSocketAddress address: addressList){
            pools.put(address,poolMap.get(address));
        }
    }

    /**
     * <pre>功能描述:
     * 根据随机数取出随机的server对应pool,从pool中取出channel
     *   pool.acquiredChannelCount(); 对应池中的channel数目
     *   连接池的动态扩容: 指定最大连接数为{@link Integer.MAX_VALUE},如果连接池队列中取不到channel,会自动创建channel,默认使用FIFO的获取方式,回收的channel优先被再次get到
     *   SERVER的宕机自动切换: 指定重试次数,get()发生连接异常,则对随机数+1,从下一个池中重新获取,
     *
     *   后期如有必要可优化为:Server注册到注册中心,从注册中心获取连接池对应的address,或者注册到zookeeper中,都需要单独写实现
     *
     * </pre>
     * @方法名称 getChannel
     * @作者 zhangdong
     * @创建时间 2019/4/23 11:39
     * @param random
     * @return io.netty.channel.Channel
     */
    public Channel getChannel (long random){
        int retry = 0;
        Channel channel = null;
        try {
            //按时间戳取余
            Long poolIndex = random % pools.size();
            InetSocketAddress address = addressList.get(poolIndex.intValue());
            FixedChannelPool pool = pools.get(address);
            Future<Channel> future = pool.acquire();
            channel = future.get();
            AttributeKey<Long> randomID = AttributeKey.valueOf(DataBusConstant.RANDOM_KEY);
            channel.attr(randomID).set(random);
            //如果是因为服务端挂点,连接失败而获取不到channel,则随机数执行+1操作,从下一个池获取
        }catch (ExecutionException e){
            log.error(e.getMessage());
            //每个池,尝试获取取2次
            int count = 2;
            if(retry < addressList.size() * count){
                retry ++;
                return getChannel( ++ random);
            }else {
                log.error("没有可以获取到channel连接的server,server list [{}]",addressList);
                throw new RuntimeException("没有可以获取到channel连接的server");
            }
        }
        catch (InterruptedException e){
            e.printStackTrace();
        }catch (Exception e){
            e.printStackTrace();
        }
        return channel;
    }


    /**
     * <pre>功能描述:
     *  回收channel进池,需要保证随机值和getChannel获取到的随机值是同一个,才能从同一个pool中释放资源
     * </pre>
     * @方法名称 release
     * @作者 zhangdong
     * @创建时间 2019/4/23 11:16
     * @param ch
     * @return void
     */
    public static void release(Channel ch){
        long random = ch.attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get();
        ch.flush();
        Long poolIndex = random % pools.size();
        pools.get(addressList.get(poolIndex.intValue())).release(ch);
    }

    /**
     * 获取线程池的hash值
     * @param ch
     * @return
     */
    public static int getPoolHash(Channel ch){
        long random = ch.attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get();
        Long poolIndex = random % pools.size();
        return System.identityHashCode(pools.get(addressList.get(poolIndex.intValue())));
    }

    /**
     * <pre>功能描述:
     * 获取服务端server列表,每个server对应一个pool
     * </pre>
     * @方法名称 getInetAddredd
     * @作者 zhangdong
     * @创建时间 2019/4/23 11:17
     * @param addresses
     * @return void
     */
    public void getInetAddresses(String addresses){
        addressList = new ArrayList<>(4);
        if(StringUtils.isEmpty(addresses)){
            throw new RuntimeException("address列表为空");
        }
        String[] splits = addresses.split(",");
        for (String address: splits){
            String[] split = address.split(":");
            if(split.length==0){
                throw new RuntimeException("[" + address + "]不符合IP:PORT格式");
            }
            addressList.add(new InetSocketAddress(split[0], Integer.parseInt(split[1])));
        }
    }
}

NettyChannelPoolHandler.java

@Slf4j
public class NettyChannelPoolHandler implements ChannelPoolHandler {

    static final ByteBuf byteBuf = Unpooled.copiedBuffer(DataBusConstant.DELIMITER.getBytes());

    @Override
    public void channelReleased(Channel ch){
        ch.writeAndFlush(Unpooled.EMPTY_BUFFER);
        log.info("|-->回收Channel. Channel ID: " + ch.id());
    }

    @Override
    public void channelAcquired(Channel ch){
        log.info("|-->获取Channel. Channel ID: " + ch.id());
    }

    @Override
    public void channelCreated(Channel ch){

        log.info("|-->创建Channel. Channel ID: " + ch.id()
                 +"\r\n|-->创建Channel. Channel REAL HASH: " + System.identityHashCode(ch));
        SocketChannel channel = (SocketChannel) ch;
        channel.config().setKeepAlive(true);
        channel.config().setTcpNoDelay(true);
        channel.pipeline()
                //开启Netty自带的心跳处理器,每5秒发送一次心跳
                .addLast(new IdleStateHandler(0, 0, 5,TimeUnit.SECONDS))
                .addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE,byteBuf))
                .addLast(new NettyClientHandler());
    }
}

通道的动态回收

3. 心跳机制的实现保证心跳不会失活,丢失心跳包的通道的管理,参考上面的 NettyChannelPoolHandler 处理器
动态通道回收,在 NettyClientHandler 类中实现userEventTriggered方法

    
    volatile static Map<Integer,Set<Channel>> coreChannel = new HashMap();

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        log.info("[客户端心跳监测发送] 通道编号:{}", ctx.channel().id());
        Channel channel = ctx.channel();
        if (evt instanceof IdleStateEvent) {
            //当客户端开始发送心跳检测时。说明没有业务请求过来,释放通道数为设定的 CORE_CONNECTIONS
            if(channel.isActive()){
                //使用pool的hash值作为Key,维护 CORE_CONNECTIONS个数个通道,多余的关闭
                int poolHash = NettyClientPool.getPoolHash(channel);
                Set<Channel> channels = coreChannel.get(poolHash);
                channels = channels == null ? new HashSet<>(DataBusConstant.CORE_CONNECTIONS) : channels;
                channels.add(channel);
                if(channels.stream().filter(x-> x.isActive()).count() > DataBusConstant.CORE_CONNECTIONS){
                    log.info("关闭 CORE_CONNECTIONS 范围之外的通道:{}",channel.id());
                    channels.remove(channel);
                    channel.close();
                }
                coreChannel.put(poolHash,channels);
            }
            String heartBeat = DataBusConstant.HEART_BEAT + DataBusConstant.DELIMITER;
            ByteBuf byteBuf = Unpooled.copiedBuffer(heartBeat.getBytes());
            channel.writeAndFlush(byteBuf);
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

辅助类

常量类

public class DataBusConstant {

    public static final String DELIMITER = "%#_#%";

    public static final String HEART_BEAT = "ping-pong-ping-pong";

    /**
     * 最大连接数
     */
    public static final int MAX_CONNECTIONS = Integer.MAX_VALUE;

    /**
     * 核心链接数,该数目内的通道 在没有业务请求时发送心跳防止失活,超过部分的通道close掉
     */
    public static final int CORE_CONNECTIONS = 1;

    /**
     * 同一个线程使用同一个全局唯一的随机数,保证从同一个池中获取和释放资源,同时使用改随机数作为Key获取返回值
     */
    public static final String RANDOM_KEY = "randomID";

    /**
     * 服务端丢失心跳次数,达到该次数,则关闭通道,默认3次
     */
    public static final int LOOS_HEART_BEAT_COUNT = 3;

}

任务连接池

@Slf4j
public class NettyTaskPool {

    /**
     * 线程池线程数量,对应CachedThreadPoolExecutor
     */
    private static final int CORE_POLL_SIZE = 0;
    private static final int MAX_POLL_SIZE = Integer.MAX_VALUE;
	 
    //手动创建线程池
    private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            CORE_POLL_SIZE,
            MAX_POLL_SIZE,
            3,
            TimeUnit.MINUTES,
            new LinkedBlockingQueue<>(),
            new ThreadPoolExecutor.DiscardOldestPolicy());

    public static String submitTask(String message) throws Exception{
        //单个任务在线程池内分配单个线程,用于同步等待封装的返回结果
        Future<String> submit = threadPool.submit(new ChannelTaskThread(message));
        String response = submit.get();
        log.info("\n\t submitTask 返回的 Response:  \r\n\t\t[ "+ response +" ]\n");
        return response;
    }

}

测试类

public class TestPool {

    public static void main(String[] args) throws Exception{
        for (int i = 0; i < 5; i++) {
            //模拟多线程客户端,提交任务,
            new Thread(()-> {
                try {
                    for (int j = 0; j < 10; j++) {
                       String longMsgBody = j + "中华人民共和国,中华人民共和国,中华人民共和国,中华人民共和国,中华人民共和国" + j;
                        NettyTaskPool.submitTask(longMsgBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

server端省略

相关文章: