最近有很多网友在咨询netty client中,netty的channel连接池应该如何设计。这是个稍微有些复杂的主题,牵扯到蛮多技术点,要想在网上找到相关的又相对完整的参考文章,确实不太容易。

  在本篇文章中,会给出其中一种解决方案,并且附带完整的可运行的代码。如果网友有更好的方案,可以回复本文,我们一起讨论讨论,一起开阔思路和眼界。

  阅读本文之前需要具备一些基础知识

  1、知道netty的一些基础知识,比如ByteBuf类的相关api;

  2、知道netty的执行流程;

  3、 必须阅读过我之前写的netty实战-自定义解码器处理半包消息,因为本文部分代码来自这篇文章。

  现在微服务非常的热门,也有很多公司在用。微服务框架中,如果是使用thrift、grpc来作为数据序列化框架的话,通常都会生成一个SDK给客户端用户使用。客户端只要使用这个SDK,就可以方便的调用服务端的微服务接口。本文讨论的就是使用SDK的netty客户端,它的netty channel连接池的设计方案。至于netty http client的channel连接池设计,基于http的,是另外一个主题了,需要另外写文章来讨论的。

  netty channel连接池设计

  DB连接池中,当某个线程获取到一个db connection后,在读取数据或者写数据时,如果线程没有操作完,这个db connection一直被该线程独占着,直到线程执行完任务。如果netty client的channel连接池设计也是使用这种独占的方式的话,有几个问题。

  1、netty中channel的writeAndFlush方法,调用完后是不用等待返回结果的,writeAndFlush一被调用,马上返回。对于这种情况,是完全没必要让线程独占一个channel的。

  2、使用类似DB pool的方式,从池子里拿连接,用完后返回,这里的一进一出,需要考虑并发锁的问题。另外,如果请求量很大的时候,连接会不够用,其他线程也只能等待其他线程释放连接。

  因此不太建议使用上面的方式来设计netty channel连接池,channel独占的代价太大了。可以使用Channel数组的形式, 复用netty的channel。当线程要需要Channel的时候,随机从数组选中一个Channel,如果Channel还未建立,则创建一个。如果线程选中的Channel已经建立了,则复用这个Channel。

  

netty实战-netty client连接池设计

  假设channel数组的长度为4

  private Channel[] channels = new Channel[4];1

  当外部系统请求client的时候,client从channels数组中随机挑选一个channel,如果该channel尚未建立,则触发建立channel的逻辑。无论有多少请求,都是复用这4个channel。假设有10个线程,那么部分线程可能会使用相同的channel来发送数据和接收数据。因为是随机选择一个channel的,多个线程命中同一个channel的机率还是很大的。如下图

  

netty实战-netty client连接池设计

  10个线程中,可能有3个线程都是使用channel2来发送数据的。这个会引入另外一个问题。thread1通过channel2发送一条消息msg1到服务端,thread2也通过channel2发送一条消息msg2到服务端,当服务端处理完数据,通过channel2返回数据给客户端的时候,如何区分哪条消息是哪个线程的呢?如果不做区分,万一thread1拿到的结果其实是thread2要的结果,怎么办?

  那么如何做到让thread1和thread2拿到它们自己想要的结果呢?

  之前我在netty实战-自定义解码器处理半包消息一文中提到,自定义消息的时候,通常会在消息中加入一个***,用来唯一标识消息的。当thread1发送消息时,往消息中插入一个唯一的消息***,同时为thread1建立一个callback回调程序,当服务端返回消息的时候,根据消息中的***从对应的callback程序获取结果。这样就可以解决上面说到的问题。

  消息格式

  

netty实战-netty client连接池设计

  消息、消息seq以及callback对应关系

  

netty实战-netty client连接池设计

  

netty实战-netty client连接池设计

  OK,下面就基于上面的设计来进行编码。

  代码

  先来实现netty客户端,设置10个线程并发获取channel,为了达到真正的并发,利用CountDownLatch来做开关,同时channel连接池设置4个channel。

  package nettyinaction.nettyclient.channelpool.client;

  import io.netty.buffer.ByteBuf;

  import io.netty.buffer.UnpooledByteBufAllocator;

  import io.netty.channel.Channel;

  import nettyinaction.nettyclient.channelpool.ChannelUtils;

  import nettyinaction.nettyclient.channelpool.IntegerFactory;

  import java.util.HashMap;

  import java.util.Map;

  import java.util.concurrent.CountDownLatch;

  public class SocketClient {

  public static void main(String[] args) throws InterruptedException {

  //当所有线程都准备后,开闸,让所有线程并发的去获取netty的channel

  final CountDownLatch countDownLatchBegin = new CountDownLatch(1);

  //当所有线程都执行完任务后,释放主线程,让主线程继续执行下去

  final CountDownLatch countDownLatchEnd = new CountDownLatch(10);

  //netty channel池

  final NettyChannelPool nettyChannelPool = new NettyChannelPool();

  final Map resultsMap = new HashMap<>();

  //使用10个线程,并发的去获取netty channel

  for (int i = 0; i < 10; i++) {

  new Thread(new Runnable() {

  @Override

  public void run() {

  try {

  //先让线程block住

  countDownLatchBegin.await();

  Channel channel = null;

  try {

  channel = nettyChannelPool.syncGetChannel();

  } catch (InterruptedException e) {

  e.printStackTrace();

  }

  //为每个线程建立一个callback,当消息返回的时候,在callback中获取结果

  CallbackService callbackService = new CallbackService();

  //给消息分配一个唯一的消息***

  int seq = IntegerFactory.getInstance().incrementAndGet();

  //利用Channel的attr方法,建立消息与callback的对应关系

  ChannelUtils.putCallback2DataMap(channel,seq,callbackService);

  synchronized (callbackService) {

  UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);

  ByteBuf buffer = allocator.buffer(20);

  buffer.writeInt(ChannelUtils.MESSAGE_LENGTH);

  buffer.writeInt(seq);

  String threadName = Thread.currentThread().getName();

  buffer.writeBytes(threadName.getBytes());

  buffer.writeBytes("body".getBytes());

  //给netty 服务端发送消息,异步的,该方法会立刻返回

  channel.writeAndFlush(buffer);

  //等待返回结果

  callbackService.wait();

  //解析结果,这个result在callback中已经解析到了。

  ByteBuf result = callbackService.result;

  int length = result.readInt();

  int seqFromServer = result.readInt();

  byte[] head = new byte[8];

  result.readBytes(head);

  String headString = new String(head);

  byte[] body = new byte[4];

  result.readBytes(body);

  String bodyString = new String(body);

  resultsMap.put(threadName, seqFromServer + headString + bodyString);

  }

  } catch (Exception e) {

  e.printStackTrace();

  }

  finally {

  countDownLatchEnd.countDown();

  }

  }

  }).start();

  }

  //开闸,让10个线程并发获取netty channel

  countDownLatchBegin.countDown();

  //等10个线程执行完后,打印最终结果

  countDownLatchEnd.await();

  System.out.println("resultMap="+resultsMap);

  }

  public static class CallbackService{

  public volatile ByteBuf result;

  public void receiveMessage(ByteBuf receiveBuf) throws Exception {

  synchronized (this) {

  result = receiveBuf;

  this.notify();

  }

  }

  }

  }

  123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106

  其中IntegerFactory类用于生成消息的唯一***

  package nettyinaction.nettyclient.channelpool;

  import java.util.concurrent.atomic.AtomicInteger;

  public class IntegerFactory {

  private static class SingletonHolder {

  private static final AtomicInteger INSTANCE = new AtomicInteger();

  }

  private IntegerFactory(){}

  public static final AtomicInteger getInstance() {

  return SingletonHolder.INSTANCE;

  }

  }

  1234567891011121314151617

  而ChannelUtils类则用于建立channel、消息***和callback程序的对应关系。

  package nettyinaction.nettyclient.channelpool;

  import io.netty.channel.Channel;

  import io.netty.util.AttributeKey;

  import java.util.Map;

  public class ChannelUtils {

  public static final int MESSAGE_LENGTH = 16;

  public static final AttributeKey> DATA_MAP_ATTRIBUTEKEY = AttributeKey.valueOf("dataMap");

  public static void putCallback2DataMap(Channel channel, int seq, T callback) {

  channel.attr(DATA_MAP_ATTRIBUTEKEY).get().put(seq, callback);

  }

  public static T removeCallback(Channel channel, int seq) {

  return (T) channel.attr(DATA_MAP_ATTRIBUTEKEY).get().remove(seq);

  }

  }




文章来源:宁波整形美容http://www.iyestar.com/

相关文章: