dymcl

NettyServer.java
 1 package com.atguigu.netty.simple;
 2 
 3 import io.netty.bootstrap.ServerBootstrap;
 4 import io.netty.channel.*;
 5 import io.netty.channel.nio.NioEventLoopGroup;
 6 import io.netty.channel.socket.SocketChannel;
 7 import io.netty.channel.socket.nio.NioServerSocketChannel;
 8 import io.netty.channel.socket.nio.NioSocketChannel;
 9 
10 public class NettyServer {
11     public static void main(String[] args) throws Exception {
12 
13 
14         //创建BossGroup 和 WorkerGroup
15         //说明
16         //1. 创建两个线程组 bossGroup 和 workerGroup
17         //2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成
18         //3. 两个都是无限循环
19         //4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数
20         //   默认实际 cpu核数 * 2
21         EventLoopGroup bossGroup = new NioEventLoopGroup(1);
22         EventLoopGroup workerGroup = new NioEventLoopGroup(); //8
23 
24 
25 
26         try {
27             //创建服务器端的启动对象,配置参数
28             ServerBootstrap bootstrap = new ServerBootstrap();
29 
30             //使用链式编程来进行设置
31             bootstrap.group(bossGroup, workerGroup) //设置两个线程组
32                     .channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现
33                     .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
34                     .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
35 //                    .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup
36                     .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
37                         //给pipeline 设置处理器
38                         @Override
39                         protected void initChannel(SocketChannel ch) throws Exception {
40                             System.out.println("客户socketchannel hashcode=" + ch.hashCode()); //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue
41                             ch.pipeline().addLast(new NettyServerHandler());
42                         }
43                     }); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器
44 
45             System.out.println(".....服务器 is ready...");
46 
47             //绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
48             //启动服务器(并绑定端口)
49             ChannelFuture cf = bootstrap.bind(6668).sync();
50 
51             //给cf 注册监听器,监控我们关心的事件
52 
53             cf.addListener(new ChannelFutureListener() {
54                 @Override
55                 public void operationComplete(ChannelFuture future) throws Exception {
56                     if (cf.isSuccess()) {
57                         System.out.println("监听端口 6668 成功");
58                     } else {
59                         System.out.println("监听端口 6668 失败");
60                     }
61                 }
62             });
63 
64 
65             //对关闭通道进行监听
66             cf.channel().closeFuture().sync();
67         }finally {
68             bossGroup.shutdownGracefully();
69             workerGroup.shutdownGracefully();
70         }
71 
72     }
73 
74 }



NettyServerHandler.java
  1 package com.atguigu.netty.simple;
  2 
  3 import io.netty.buffer.ByteBuf;
  4 import io.netty.buffer.Unpooled;
  5 import io.netty.channel.Channel;
  6 import io.netty.channel.ChannelHandlerContext;
  7 import io.netty.channel.ChannelInboundHandlerAdapter;
  8 import io.netty.channel.ChannelPipeline;
  9 import io.netty.util.CharsetUtil;
 10 
 11 import java.util.concurrent.TimeUnit;
 12 
 13 /*
 14 说明
 15 1. 我们自定义一个Handler 需要继续netty 规定好的某个HandlerAdapter(规范)
 16 2. 这时我们自定义一个Handler , 才能称为一个handler
 17  */
 18 public class NettyServerHandler extends ChannelInboundHandlerAdapter {
 19 
 20     //读取数据实际(这里我们可以读取客户端发送的消息)
 21     /*
 22     1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
 23     2. Object msg: 就是客户端发送的数据 默认Object
 24      */
 25     @Override
 26     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 27 
 28 /*
 29 
 30         //比如这里我们有一个非常耗时长的业务-> 异步执行 -> 提交该channel 对应的
 31         //NIOEventLoop 的 taskQueue中,
 32 
 33         //解决方案1 用户程序自定义的普通任务
 34 
 35         ctx.channel().eventLoop().execute(new Runnable() {
 36             @Override
 37             public void run() {
 38 
 39                 try {
 40                     Thread.sleep(5 * 1000);
 41                     ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));
 42                     System.out.println("channel code=" + ctx.channel().hashCode());
 43                 } catch (Exception ex) {
 44                     System.out.println("发生异常" + ex.getMessage());
 45                 }
 46             }
 47         });
 48 
 49         ctx.channel().eventLoop().execute(new Runnable() {
 50             @Override
 51             public void run() {
 52 
 53                 try {
 54                     Thread.sleep(5 * 1000);
 55                     ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵3", CharsetUtil.UTF_8));
 56                     System.out.println("channel code=" + ctx.channel().hashCode());
 57                 } catch (Exception ex) {
 58                     System.out.println("发生异常" + ex.getMessage());
 59                 }
 60             }
 61         });
 62 
 63         //解决方案2 : 用户自定义定时任务 -》 该任务是提交到 scheduleTaskQueue中
 64 
 65         ctx.channel().eventLoop().schedule(new Runnable() {
 66             @Override
 67             public void run() {
 68 
 69                 try {
 70                     Thread.sleep(5 * 1000);
 71                     ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵4", CharsetUtil.UTF_8));
 72                     System.out.println("channel code=" + ctx.channel().hashCode());
 73                 } catch (Exception ex) {
 74                     System.out.println("发生异常" + ex.getMessage());
 75                 }
 76             }
 77         }, 5, TimeUnit.SECONDS);
 78 
 79 
 80 
 81         System.out.println("go on ...");*/
 82 
 83 
 84         System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel());
 85         System.out.println("server ctx =" + ctx);
 86         System.out.println("看看channel 和 pipeline的关系");
 87         Channel channel = ctx.channel();
 88         ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站
 89 
 90 
 91         //将 msg 转成一个 ByteBuf
 92         //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
 93         ByteBuf buf = (ByteBuf) msg;
 94         System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
 95         System.out.println("客户端地址:" + channel.remoteAddress());
 96     }
 97 
 98     //数据读取完毕
 99     @Override
100     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
101 
102         //writeAndFlush 是 write + flush
103         //将数据写入到缓存,并刷新
104         //一般讲,我们对这个发送的数据进行编码
105         ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
106     }
107 
108     //处理异常, 一般是需要关闭通道
109 
110     @Override
111     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
112         ctx.close();
113     }
114 }

NettyClient.java
 1 package com.atguigu.netty.simple;
 2 
 3 import io.netty.bootstrap.Bootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.EventLoopGroup;
 7 import io.netty.channel.nio.NioEventLoopGroup;
 8 import io.netty.channel.socket.SocketChannel;
 9 import io.netty.channel.socket.nio.NioSocketChannel;
10 
11 public class NettyClient {
12     public static void main(String[] args) throws Exception {
13 
14         //客户端需要一个事件循环组
15         EventLoopGroup group = new NioEventLoopGroup();
16 
17 
18         try {
19             //创建客户端启动对象
20             //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
21             Bootstrap bootstrap = new Bootstrap();
22 
23             //设置相关参数
24             bootstrap.group(group) //设置线程组
25                     .channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
26                     .handler(new ChannelInitializer<SocketChannel>() {
27                         @Override
28                         protected void initChannel(SocketChannel ch) throws Exception {
29                             ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器
30                         }
31                     });
32 
33             System.out.println("客户端 ok..");
34 
35             //启动客户端去连接服务器端
36             //关于 ChannelFuture 要分析,涉及到netty的异步模型
37             ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
38             //给关闭通道进行监听
39             channelFuture.channel().closeFuture().sync();
40         }finally {
41 
42             group.shutdownGracefully();
43 
44         }
45     }
46 }

 

 

 

 

 

 




 

 

 

 pipeline与channel是相互包含的关系

分类:

技术点:

相关文章:

  • 2022-02-25
  • 2022-12-23
  • 2021-09-05
  • 2022-01-11
  • 2022-12-23
  • 2021-12-24
猜你喜欢
  • 2021-11-06
  • 2021-06-01
  • 2021-07-03
  • 2021-08-23
  • 2021-04-07
相关资源
相似解决方案