NettyServer.java
1 package com.atguigu.netty.simple; 2 3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.channel.*; 5 import io.netty.channel.nio.NioEventLoopGroup; 6 import io.netty.channel.socket.SocketChannel; 7 import io.netty.channel.socket.nio.NioServerSocketChannel; 8 import io.netty.channel.socket.nio.NioSocketChannel; 9 10 public class NettyServer { 11 public static void main(String[] args) throws Exception { 12 13 14 //创建BossGroup 和 WorkerGroup 15 //说明 16 //1. 创建两个线程组 bossGroup 和 workerGroup 17 //2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成 18 //3. 两个都是无限循环 19 //4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数 20 // 默认实际 cpu核数 * 2 21 EventLoopGroup bossGroup = new NioEventLoopGroup(1); 22 EventLoopGroup workerGroup = new NioEventLoopGroup(); //8 23 24 25 26 try { 27 //创建服务器端的启动对象,配置参数 28 ServerBootstrap bootstrap = new ServerBootstrap(); 29 30 //使用链式编程来进行设置 31 bootstrap.group(bossGroup, workerGroup) //设置两个线程组 32 .channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现 33 .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数 34 .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态 35 // .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup 36 .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象) 37 //给pipeline 设置处理器 38 @Override 39 protected void initChannel(SocketChannel ch) throws Exception { 40 System.out.println("客户socketchannel hashcode=" + ch.hashCode()); //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue 41 ch.pipeline().addLast(new NettyServerHandler()); 42 } 43 }); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器 44 45 System.out.println(".....服务器 is ready..."); 46 47 //绑定一个端口并且同步, 生成了一个 ChannelFuture 对象 48 //启动服务器(并绑定端口) 49 ChannelFuture cf = bootstrap.bind(6668).sync(); 50 51 //给cf 注册监听器,监控我们关心的事件 52 53 cf.addListener(new ChannelFutureListener() { 54 @Override 55 public void operationComplete(ChannelFuture future) throws Exception { 56 if (cf.isSuccess()) { 57 System.out.println("监听端口 6668 成功"); 58 } else { 59 System.out.println("监听端口 6668 失败"); 60 } 61 } 62 }); 63 64 65 //对关闭通道进行监听 66 cf.channel().closeFuture().sync(); 67 }finally { 68 bossGroup.shutdownGracefully(); 69 workerGroup.shutdownGracefully(); 70 } 71 72 } 73 74 }
NettyServerHandler.java
1 package com.atguigu.netty.simple; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.Channel; 6 import io.netty.channel.ChannelHandlerContext; 7 import io.netty.channel.ChannelInboundHandlerAdapter; 8 import io.netty.channel.ChannelPipeline; 9 import io.netty.util.CharsetUtil; 10 11 import java.util.concurrent.TimeUnit; 12 13 /* 14 说明 15 1. 我们自定义一个Handler 需要继续netty 规定好的某个HandlerAdapter(规范) 16 2. 这时我们自定义一个Handler , 才能称为一个handler 17 */ 18 public class NettyServerHandler extends ChannelInboundHandlerAdapter { 19 20 //读取数据实际(这里我们可以读取客户端发送的消息) 21 /* 22 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址 23 2. Object msg: 就是客户端发送的数据 默认Object 24 */ 25 @Override 26 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 27 28 /* 29 30 //比如这里我们有一个非常耗时长的业务-> 异步执行 -> 提交该channel 对应的 31 //NIOEventLoop 的 taskQueue中, 32 33 //解决方案1 用户程序自定义的普通任务 34 35 ctx.channel().eventLoop().execute(new Runnable() { 36 @Override 37 public void run() { 38 39 try { 40 Thread.sleep(5 * 1000); 41 ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8)); 42 System.out.println("channel code=" + ctx.channel().hashCode()); 43 } catch (Exception ex) { 44 System.out.println("发生异常" + ex.getMessage()); 45 } 46 } 47 }); 48 49 ctx.channel().eventLoop().execute(new Runnable() { 50 @Override 51 public void run() { 52 53 try { 54 Thread.sleep(5 * 1000); 55 ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵3", CharsetUtil.UTF_8)); 56 System.out.println("channel code=" + ctx.channel().hashCode()); 57 } catch (Exception ex) { 58 System.out.println("发生异常" + ex.getMessage()); 59 } 60 } 61 }); 62 63 //解决方案2 : 用户自定义定时任务 -》 该任务是提交到 scheduleTaskQueue中 64 65 ctx.channel().eventLoop().schedule(new Runnable() { 66 @Override 67 public void run() { 68 69 try { 70 Thread.sleep(5 * 1000); 71 ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵4", CharsetUtil.UTF_8)); 72 System.out.println("channel code=" + ctx.channel().hashCode()); 73 } catch (Exception ex) { 74 System.out.println("发生异常" + ex.getMessage()); 75 } 76 } 77 }, 5, TimeUnit.SECONDS); 78 79 80 81 System.out.println("go on ...");*/ 82 83 84 System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel()); 85 System.out.println("server ctx =" + ctx); 86 System.out.println("看看channel 和 pipeline的关系"); 87 Channel channel = ctx.channel(); 88 ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站 89 90 91 //将 msg 转成一个 ByteBuf 92 //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer. 93 ByteBuf buf = (ByteBuf) msg; 94 System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8)); 95 System.out.println("客户端地址:" + channel.remoteAddress()); 96 } 97 98 //数据读取完毕 99 @Override 100 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 101 102 //writeAndFlush 是 write + flush 103 //将数据写入到缓存,并刷新 104 //一般讲,我们对这个发送的数据进行编码 105 ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8)); 106 } 107 108 //处理异常, 一般是需要关闭通道 109 110 @Override 111 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 112 ctx.close(); 113 } 114 }
NettyClient.java
1 package com.atguigu.netty.simple; 2 3 import io.netty.bootstrap.Bootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.EventLoopGroup; 7 import io.netty.channel.nio.NioEventLoopGroup; 8 import io.netty.channel.socket.SocketChannel; 9 import io.netty.channel.socket.nio.NioSocketChannel; 10 11 public class NettyClient { 12 public static void main(String[] args) throws Exception { 13 14 //客户端需要一个事件循环组 15 EventLoopGroup group = new NioEventLoopGroup(); 16 17 18 try { 19 //创建客户端启动对象 20 //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap 21 Bootstrap bootstrap = new Bootstrap(); 22 23 //设置相关参数 24 bootstrap.group(group) //设置线程组 25 .channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射) 26 .handler(new ChannelInitializer<SocketChannel>() { 27 @Override 28 protected void initChannel(SocketChannel ch) throws Exception { 29 ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器 30 } 31 }); 32 33 System.out.println("客户端 ok.."); 34 35 //启动客户端去连接服务器端 36 //关于 ChannelFuture 要分析,涉及到netty的异步模型 37 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync(); 38 //给关闭通道进行监听 39 channelFuture.channel().closeFuture().sync(); 40 }finally { 41 42 group.shutdownGracefully(); 43 44 } 45 } 46 }
pipeline与channel是相互包含的关系