工作原理示意图 1-简单版
Netty 主要基于主从 Reactors 多线程模型(如图) 做了一定的改进, 其中主从 Reactor 多线程模型有多个 Reactor
对上图说明
1) BossGroup 线程维护 Selector , 只关注 Accecpt。
2) 当接收到 Accept 事件, 获取到对应的 SocketChannel, 封装成 NIOScoketChannel 并注册到 Worker 线程(事件循环), 并进行维护。
3) 当 Worker 线程监听到 selector 中通道发生自己感兴趣的事件后, 就进行处理(就由 handler), 注意 handler 已经加入到通道 。
工作原理示意图 2-进阶版
工作原理示意图 3 - 详细版
对上图的说明小结
1) Netty 抽象出两组线程池 BossGroup 专门负责接收客户端的连接, WorkerGroup 专门负责网络的读写
2) BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup
3) NioEventLoopGroup 相当于一个事件循环组, 这个组中含有多个事件循环 , 每一个事件循环是 NioEventLoop
4) NioEventLoop 表示一个不断循环的执行处理任务的线程, 每个 NioEventLoop 都有一个 selector , 用于监听绑定在其上的 socket 的网络通讯
5) NioEventLoopGroup 可以有多个线程, 即可以含有多个 NioEventLoop
6) 每个 Boss NioEventLoop 循环执行的步骤有 3 步
- 轮询 accept 事件
- 处理 accept 事件 , 与 client 建立连接 , 生成 NioScocketChannel , 并将其注册到某个 worker NIOEventLoop 上的 selector
- 处理任务队列的任务 , 即 runAllTasks
8) 每个Worker NIOEventLoop 处理业务时, 会使用pipeline(管道), pipeline 中包含了 channel (通道), 即通过pipeline可以获取到对应通道, 管道中维护了很多的 处理器
自己总结:pipeline(管道)含有多个ChanelHandler(看上面图)
Netty 快速入门实例-TCP 服务
NettyServer
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class NettyServer { public static void main(String[] args) throws Exception { //创建BossGroup 和 WorkerGroup //说明 //1. 创建两个线程组 bossGroup 和 workerGroup //2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成 //3. 两个都是无限循环 //4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数 // 默认实际 cpu核数 * 2 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //8 try { //创建服务器端的启动对象,配置参数 ServerBootstrap bootstrap = new ServerBootstrap(); //使用链式编程来进行设置 bootstrap.group(bossGroup, workerGroup) //设置两个线程组 .channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现 .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数 .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态 // .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象) //给pipeline 设置处理器 @Override protected void initChannel(SocketChannel ch) throws Exception { System.out.println("客户socketchannel hashcode=" + ch.hashCode()); //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue ch.pipeline().addLast(new NettyServerHandler()); } }); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器 System.out.println(".....服务器 is ready..."); //绑定一个端口并且同步, 生成了一个 ChannelFuture 对象 //启动服务器(并绑定端口) ChannelFuture cf = bootstrap.bind(6668).sync(); //给cf 注册监听器,监控我们关心的事件 cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (cf.isSuccess()) { System.out.println("监听端口 6668 成功"); } else { System.out.println("监听端口 6668 失败"); } } }); //对关闭通道进行监听 cf.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }