前面,我们学习了 Netty 的基本 原理和架构 ,今天我们来大致了解一下 Netty 中的各个组件。
同我们 前面 学习 IO 与 NIO 一样的套路,我们先通过 echo 服务 demo 来学习 netty 的使用。
开发环境
- JDK >= 8
- Netty 4.1.29.Final
编写 Echo Server 代码
Netty 服务端的开发主要有以下两个步骤:
- 至少有一个 ChannelHandler —— 这个主要用于处理从 client 端接受到的信息,是主要的业务逻辑处理类。
- Bootstrapping —— 用于配置服务的启动代码。最简单的就是,监听一个端口。
实现 EchoServerHandler 逻辑
服务端用于处理入站的网络请求,因此我们需要实现接口类 ChannelInboundHandler,它里面定义了用于
处理入站请求的一些接口。由于我们这个例子比较简单,只需要用到它的几个方法即可,因此我们的实现类只需要继承子类 ChannelInboundHandlerAdapter 即可,它默认实现了 ChannelInboundHandler 中的接口。
有几个方法需要留意一下:
- channelRead() —— 每当有入站请求来临时,该方法都会被调用
- channelReadComplete() —— 对 channelRead () 的最后一次调用是当前批处理中的最后一条消息时,该方法会被调用
- exceptionCaught() —— 在 read 操作执行期间,如果发生异常,该方法则会被调用。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
// @Sharable象征着该ChannelHandler实例在多个channels之间可以被安全地分享
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
// 打印消息日志
System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
// 将入站消息发送给发送者,但不冲刷出站消息
ctx.write(in);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 将待处理的消息冲刷到远程节点上,并关闭Channel
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 打印堆栈信息
cause.printStackTrace();
// 关闭channel
ctx.close();
}
}
|
实现 EchoServer 逻辑
接下来,我们使用 ServerBootstrap 来实现服务端的开发,主要以下两点:
- 绑定一个监听端口
- 配置 Channels,当有入站消息到达时,通知 EchoServerHeadler 实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
package nia.chapter2.echoserver;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Usage: " + EchoServer.class.getSimpleName() + " <port>");
return;
}
int port = Integer.parseInt(args[0]);
new EchoServer(port).start();
}
public void start() throws Exception {
final EchoServerHandler serverHandler = new EchoServerHandler();
// 创建 EventLoopGroup 实例
EventLoopGroup group = new NioEventLoopGroup();
try {
// 创建 ServerBootstrap 实例
ServerBootstrap b = new ServerBootstrap();
b.group(group)
// 执行Channel的类型为:NioServerSocketChannel
.channel(NioServerSocketChannel.class)
// 绑定端口
.localAddress(new InetSocketAddress(port))
// 将EchoServerHandler添加到ChannelPipeline中去
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// EchoServerHandler有注解 @Sharable,因此我们总是可以使用改实例
ch.pipeline().addLast(serverHandler);
}
});
// 异步绑定服务,sync() 用于等待绑定完成
ChannelFuture f = b.bind().sync();
System.out.println(EchoServer.class.getName() +
" started and listening for connections on " + f.channel().localAddress());
// 获取Channel的CloseFutrue,在完成之前一直处于阻塞状态
f.channel().closeFuture().sync();
} finally {
// 关闭所有 EventLoopGroup,并释放所有资源
group.shutdownGracefully().sync();
}
}
}
|
编写 Echo Client 代码
Echo Client 代码逻辑:
- 连接服务器
- 发送一个或多个消息
- 等待服务端返回同样的消息
- 关闭连接
实现 EchoClientHandler 逻辑
同服务端一样,客户端也要实现 ChannelInboundHandler 接口,客户端需要继承 SimpleChannelInboundHandler ,有以下三个接口需要重写:
- channelActive() —— 当连接建立时,调用该方法
- channelRead0() —— 当接收到服务端的消息时,调用该方法
- exceptionCaught() —— 当有异常发生时,执行该方法
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
package nia.chapter2.echoclient;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
// @Sharable用于标记EchoClientHandler,可以在channel中分享使用
@Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 一旦连接建立,将会发送消息
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!",CharsetUtil.UTF_8));
}
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
// 记录收到的消息
System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 打印堆栈信息
cause.printStackTrace();
// 关闭channel
ctx.close();
}
}
|
实现 EchoClient 逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
package nia.chapter2.echoclient;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start()
throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
// 创建Bootstrap
Bootstrap b = new Bootstrap();
// 指定使用 NioEventLoopGroup 去处理客户端事件
b.group(group)
// 指定channel类型为NIO
.channel(NioSocketChannel.class)
// 指定要连接的远程地址
.remoteAddress(new InetSocketAddress(host, port))
// 将 EchoClientHandler 添加到 pipeline中
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
// 连接远程地址,一直等待直到连接完成
ChannelFuture f = b.connect().sync();
// 在channel关闭前一直处于block状态
f.channel().closeFuture().sync();
} finally {
// 关闭线程池,释放所有资源
group.shutdownGracefully().sync();
}
}
public static void main(String[] args)
throws Exception {
if (args.length != 2) {
System.err.println("Usage: " + EchoClient.class.getSimpleName() +
" <host> <port>"
);
return;
}
final String host = args[0];
final int port = Integer.parseInt(args[1]);
new EchoClient(host, port).start();
}
}
|
为什么 client 使用 SimpleChannelInboundHandler ,而 server 端使用 ChannelInboundHandlerAdapter 的区别 ?
在 Client 中,channelRead0 () 完成时,消息已经处理完。当该方法返回时,SimpleChannelInboundHandler 会释放保存该消息的 ByteBuf 的内存引用。
在 Server 中,接收完消息后,还需要将消息回传给客户端,并且 wirte () 是异步的,当 channelRead () 完成时,消息内存还没有被释放。需要等到 channelReadComplete () 中调用 writeAndFlush () 才会被释放。
Netty 组件
这里我们先简要了解一下以下几个组件的作用,留个映像,后面我们会对每个组件做详细深入。
Channel
同我们前面学习 Java NIO Channel 类似,Netty Channel 在此基础上做了高度抽象的封装,主要用于网络 I/O 数据的基本操作,如 bind ()、connect ()、read ()、write () 等。
EventLoop
在网络连接的整个生命周期内,发生的所有事件的处理主要有 EventLoop 来处理
ChannelFuture
在 Netty 中,I/O 操作主要都是异步进行,当操作发生时,我们需要通过一种方式来知道操作在未来的时间点的执行结果。ChannelFutrue 中的 addListener () 方法,可以注册监听器 ChannelFutureListener,当操作完成时,监听器可以主动通知我们。
ChannelHandler
channelHandler 主要用于应用程序中的业务逻辑的处理,网络中的进入与出去的数据都经由它处理,当有事件发生时,channelHandler 会被触发执行。
ChannelPipeline
ChannelPipeline 提供了一种容器,用于定义数据流入与流出过程中的处理流程。可以将 Pipeline 看作是一条流水线,原始的原料 (字节流) 进来,经过加工,最后输出。
Bootstrapping
主要用于配置服务端或客户端的 Netty 程序的启动信息。
ByteBuf
字节数据容器,提供比 Java NIO ByteBuffer 更好的的 API。