一.netty基本概念:
netty是一个异步的事件驱动的网络应用框架,用于可维护的、高性能协议的服务端和客户端的快速开seda:发把一个请求分为若干个个stages,每一个stages可以用
不同的线程进行处理,不同的stages采用了事件驱动的异步沟通方式进行通信
1.可以作为一个rpc框架,2.作为一个长连接的服务器,3.作为一个http的服务器(类似于springmvc,struts2框架编写的web应用但是并没有实现一个servlet的规范)
二.基本使用:
netty的一些基本代码实例,学习新技术需要成就感,因此要先会用,然后研究原理。
三.netty与Nio:
1.io模型:
(1)阻塞 IO 模型 :
(2)非阻塞io模型:
(3)多路复用的io模型:
(4)信号驱动 IO 模型 :
(5)异步 IO 模型:
操作是如何
2.nio组件:
- buffer
buffer本身是一块内存,底层实现上,实际上是个数组,数据的读和写是通过buffer来实现的
nio可以允许buffer实现读写的切换,buffer.filp();//读写切换的开关
所有数据的读写都是通过buffer来进行的,永远不会出现直接向channel读取或者写入的情况
public static void main(String [] args)throws Exception{
FileOutputStream fileOutputStream = new FileOutputStream("aa.txt");
FileChannel fileChannel = fileOutputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
byte[] message = "hello world ".getBytes();
for(int i=0;i<message.length;i++){
byteBuffer.put(message[i]);
}
byteBuffer.flip();
fileChannel.write(byteBuffer);
fileOutputStream.close();
}
三个属性:position、limit、capacity的解释:
public static void main(String [] args)throws Exception{
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
byte[] message = "hello world ".getBytes();
byteBuffer.put(message);
System.out.println("初始状态---"+byteBuffer.position()+":"+byteBuffer.limit());
byteBuffer.flip();
while (byteBuffer.remaining()>0){
byte b = byteBuffer.get();
System.out.println(b);
}
byteBuffer.flip();
System.out.println("结束状态---"+byteBuffer.position()+":"+byteBuffer.limit());
byte[] message2 = "hello world hello ".getBytes();
byteBuffer.put(message2);
System.out.println("结束状态2---"+byteBuffer.position()+":"+byteBuffer.limit());
}
(1)DirectBuffer:
调用native方法即本地方法在堆外内存生成directBuffer,能够提升效率
如果使用HeapByteBuffer,当与外设进行交互的时候会将jvm中的数据对象拷贝到堆外内存,多了一个拷贝的过程
为什么要拷贝?
虽然操作系统可以访问到堆上的内存但是
考虑到gc的时候如果正在与外设进行io交互,对于标记清除算法的jvm,进行gc的时候会出现混乱
并且堆上维护了一个address,对应了堆外内存的地址,当address被清除的时候,堆外内存的数据也会清除
但是DirectBuffer,他是不会在jvm堆中在生成对象了,直接和io进行交互,也就不会有拷贝的过程,所以称之为零拷贝,堆外内存由操作系统进行维护
public static void main(String [] args) throws Exception{
FileInputStream fileInputStream = new FileInputStream("");
FileChannel inputChannel = fileInputStream.getChannel();
FileOutputStream fileOutputStream = new FileOutputStream("");
FileChannel outputChannel = fileOutputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(512);//分配直接内存缓冲
while(true){
byteBuffer.clear();
int read = inputChannel.read(byteBuffer);
if(-1 == read) break;
byteBuffer.flip();
outputChannel.write(byteBuffer);
}
fileInputStream.close();
fileOutputStream.close();
}
(2)mappedByteBuffer
内存映射文件:允许java直接从内存访问的特殊文件
public static void main(String [] args) throws Exception{
/**
* 内存映射文件,直接在内存中修改文件内容
*/
RandomAccessFile randomAccessFile = new RandomAccessFile("C:\\data\\report\\upload\\2019-06\\ddd.txt","rw");
FileChannel fileChannel = randomAccessFile.getChannel();
int count = 0;
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE,0,fileChannel.size());
for(int x=0;x<fileChannel.size();x++){
if(mappedByteBuffer.get(x)==10){
count = x;
break;
}
}
int length = (int)fileChannel.size();
byte [] bytes = new byte[length-count];
mappedByteBuffer.get(bytes,count+1,length-1);
mappedByteBuffer.flip();
mappedByteBuffer.put(bytes,0,length-count);
randomAccessFile.close();
}
(3)Scattering与Gathering:
Scattering将来自于一个channel的数据读到多个buffer,按照顺序,第一个buffer满了之后,读读到第二个,实例:实现数据的分门别类,比如网络操作自定义协议,(请求头1,请求头2,请求体)把第一个请求头读取到第一个buffer,第二个读取到第二个buffer,不用读取之后再次解析数据。
Gathering按照顺序,将多个buffer的内容写到channel中:
public static void main(String [] args) throws Exception{
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(8898);
serverSocketChannel.socket().bind(address);
int messageLength = 9;
ByteBuffer [] buffers = new ByteBuffer[3];
buffers[0] = ByteBuffer.allocate(2);
buffers[1] = ByteBuffer.allocate(3);
buffers[2] = ByteBuffer.allocate(4);
SocketChannel socketChannel = serverSocketChannel.accept();
while(true){
int byteRead = 0;
while(byteRead < messageLength){
long r = socketChannel.read(buffers);
byteRead += r;
System.out.println("byteRead:"+byteRead);
Arrays.asList(buffers).stream().map(buffer->"position:"+
buffer.position()+",limit:"+buffer.limit()).forEach(System.out::println);
}
Arrays.asList(buffers).forEach(buffer->{
buffer.flip();
});
long byteWritten = 0;
while(byteWritten<messageLength){
long r = socketChannel.write(buffers);
byteWritten += r;
}
Arrays.asList(buffers).forEach(byteBuffer -> {
byteBuffer.clear();
});
System.out.println("read:"+byteRead+","+"write:"+byteWritten);
}
}
- channel
与stream不同,channel是双向的,一个流只可能是InputOutStream或者OutputStream,channel打开后可以进行读取或者写入
channel可以更好的反映底层操作系统的真实情况。因为在linux中,底层操作系统的通道就是双向的
- selector:
普通网络编程:每一个连接都需要服务端启动一个线程来进行处理,比较消耗资源,所以适合并发场景不是很高的场景
传统的基于io的网络编程:
new ServerSocket();
serverSocker.bind(port);//port用于客户端和服务端进行连接的端口,实际的数据传输的端口由服务端选择未被占用的端口中随机分配
while(true){
serverSocket.accept();//阻塞,等待客户端连接
new Thread(socket).run(){
socket.getInputStream();//获取流
}
}
缺点:每个链接都需要一个线程的处理,多个线程之间上下文切换开销很大,当连接上没有数据传递的时候线程始终在运行,造成线程资源的浪费
event:事件(异步的处理)
nio编程模型特点:服务端可以使用一个线程来处理多个客户端的连接,最终处理每个连接是同一个线程,通过事件来触发。
代码1:
服务器端监听5个端口号,一个线程处理所有客户端的连接
public static void main(String [] args) throws Exception{
int [] ports = new int[5];
ports[0] = 5000;
ports[1] = 5001;
ports[2] = 5002;
ports[3] = 5003;
ports[4] = 5004;
Selector selector = Selector.open();
for(int i =0;i<ports.length;i++){
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);//配置channel是否阻塞
ServerSocket serverSocket = serverSocketChannel.socket();
InetSocketAddress address = new InetSocketAddress(ports[i]);
serverSocket.bind(address);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//注册selecttor并且标识感兴趣的事件,连接事件
}
while (true){
int numbers = selector.select();//表示选择的数量,即key-set的数量
//获取到selected-key的集合即获取到事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
int byteRead = 0;
while(iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
if(selectionKey.isAcceptable()){//
//真正的连接,selector与channel关联
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ);//将读事件添加到集合,这样才会到else if当中
iterator.remove();//一定要删除掉,表示这个事件已经结束,否则会重复监听
System.out.println("获取到了客户端的连接"+socketChannel);
}else if(selectionKey.isReadable()){
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
while(true){
ByteBuffer byteBuffer = ByteBuffer .allocate(512);
byteBuffer.clear();
int read = socketChannel.read(byteBuffer);
if(read<=0)break;
byteBuffer.flip();
socketChannel.write(byteBuffer);
byteRead += read;
}
System.out.println("读取:" + byteRead + ","+"来自于:" + socketChannel);
iterator.remove();
}
}
}
}
代码2:简单的聊天程序:
服务端:
public class NioTestServer8 {
private static Map<String,SocketChannel> clientMap = new HashMap<>();//维护客户端的连接信息
public static void main(String [] args) throws Exception{
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();//建立连接的作用
serverSocketChannel.configureBlocking(false);
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(8899));
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//开始连接的时候只有一个事件,关注连接事件
while(true){
try {
selector.select();
Set<SelectionKey> selectionKeySet = selector.selectedKeys();
selectionKeySet.forEach(selectionKey -> {
final SocketChannel client ;
try{
if(selectionKey.isAcceptable()){
//连接建立,获取到连接到的channl,注册更多的感兴趣的事件
ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel)selectionKey.channel();
client = serverSocketChannel1.accept();//真实的连接
client.configureBlocking(false);
client.register(selector,SelectionKey.OP_READ);//关注读取事件
String uuid = UUID.randomUUID()+"";
clientMap.put(uuid,client);
}else if(selectionKey.isReadable()){
client = (SocketChannel)selectionKey.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int count = client.read(readBuffer);
if(count>0){
readBuffer.flip();
Charset charset = Charset.forName("utf-8");
String receiveMessage = String.valueOf(charset.decode(readBuffer).array());
System.out.println(client + ":" + receiveMessage);
String sendKey = "";
for(Map.Entry<String, SocketChannel> entry:clientMap.entrySet()){
if(client == entry.getValue()){
sendKey = entry.getKey();
break;
}
}
for(Map.Entry<String, SocketChannel> entry:clientMap.entrySet()){
SocketChannel value = entry.getValue();
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put((sendKey +":"+receiveMessage).getBytes());
writeBuffer.flip();
value.write(writeBuffer);
}
}
}
}catch (Exception e){
}
});
selectionKeySet.clear();
//开始的时候连接通道标识了连接事件,只对连接事件感兴趣
//获取到真正的通道的时候标识了读事件,只对读事件感兴趣
//每次访问后都remove()相应的SelectionKey,但是移除了selectedKeys中的SelectionKey不代表移除了selector中的channel信息(这点很重要)
}catch (Exception e){
}
}
}
}
|
客户端:
package nio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class NioTestClient8 {
public static void main(String [] args){
try{
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
Selector selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_CONNECT);
socketChannel.connect(new InetSocketAddress("127.0.0.1",8899));
while(true){
selector.select();
Set<SelectionKey> selectionKeySet = selector.selectedKeys();
for(SelectionKey selectionKey : selectionKeySet){
SocketChannel client = (SocketChannel)selectionKey.channel();
if(selectionKey.isConnectable()){
if(client.isConnectionPending()){
//连接是否处在正在进行的状态
client.finishConnect();//完成连接,连接已经建立
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put((LocalDateTime.now()+"连接成功").getBytes());
writeBuffer.flip();
client.write(writeBuffer);
ExecutorService executorService = Executors.newSingleThreadExecutor(Executors.defaultThreadFactory());
executorService.submit(new Runnable() {
@Override
public void run() {
while(true){
try {
writeBuffer.clear();
InputStreamReader inputStreamReader = new InputStreamReader(System.in);
BufferedReader br = new BufferedReader(inputStreamReader);
String sendMessage = br.readLine();
writeBuffer.put(sendMessage.getBytes());
writeBuffer.flip();
client.write(writeBuffer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
}
client.register(selector,SelectionKey.OP_READ);
} else if(selectionKey.isReadable()){
SocketChannel socketChannel1 = (SocketChannel)selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int count = client.read(byteBuffer);
if(count>0){
String receiveMessage = new String(byteBuffer.array(),0,count);
System.out.println(receiveMessage);
}
}
selectionKeySet.clear();
}
}
}catch(Exception e) {
e.printStackTrace();
}
}
}
|
3.零拷贝深入分析:
(1)普通io操作:
linux系统上用户空间:userspace比如socketChannel
内核空间:kernel space
hardware:硬件,或者外设
用户空间向内核空间发送读取数据的指令,从用户空间模式切换成内核空间模式,内核空间通过直接内存访问磁盘读取数据到内核空间的缓冲区当中,然后再把缓冲区的数据拷贝到用户空间的缓冲区。然后开始执行逻辑代码,read操作完毕。
write操作:将读到的数据拷贝到内核空间,内核空间将数据写到外设(socketChannel)
(2)系统层面的零拷贝:
sendfile指令:用户空间没有拷贝操作,全部都是在内核空间进行,而且只有两次上下文的切换
(3)通过Scatter和gather直接读取数据到内核空间缓冲区:
(4)真正的零拷贝全流程:
dma直接内存访问
将硬盘的数据拷贝到内核空间缓冲区,将数据描述符(引用地址)拷贝到socket buffer,这就是scatter操作,协议引擎就使用gather操作读取这两个数据发送出去
四.Reactor模式:
netty的最最基础模式:
Doug lea写的scalable IO in java是对reator模式的最好阐述,读懂了这个文档就读懂了reator
其他文档:https://www.cnblogs.com/crazymakercircle/p/9833847.html
基础设计(单线程):就是一个线程,能够检测客户端向服务端发送的连接,当连接(事件)建立之后,reactor会分发正确的handler进行处理,acceptor的作用就是接受连接,也就是连接事件。
Reactor意图:处理一个或者多个客户端并发的向一个应用发送请求。
reactor一共有五种角色:
Handle:句柄或者描述符,本质上表示一种资源是由操作系统提供:该资源用于表示一个个的事件,比如文件描述符,或者网络编程中的socket描述符。既可以来自内部,也可以来自外部,比如客户端向服务端发送数据,内部事件:操作系统产生的定时器事件。
synchronize event demultiplexer 同步事务分离器:阻塞的
本身是一个系统调用,用于等待事件的发生,事件可能是一个或者多个,调用方在调用的时候会被阻塞,一直阻塞到同步事件分离器上有事件产生为止。对于linux来说,指的就是常用的io多路复用比如select,poll,epoll.对应nio中的selector
event Handler 事件处理器:
本身由多个回调方法构成,这些回调方法构成了与应用相关的对于某个事件的反馈机制
concrete event handler 具体事件处理器,是event handler的实现,本质上就是处理器的实现
initiation disapatcher 初始分发器:
实际上就是Reactor角色,本身定义了一些规范,这些规范用于控制事件的调度方式,同时又提供了应用进行事件处理器的注册、删除等设施。本身是事件处理器的核心所在,会通过同步事件分离器等待事件的发生,首先会分理出每一个事件,然后调用事件处理器,然后调用相关的回调方法来处理这些事件。
总结nio和reactor:
nio中的selector是一种最简扑的reactor模式:
nio 首先open一个ServerSocketChannel,然后configureBlocking为false,然后或缺serversocket并绑定端口。
打开selector,注册一个连接事件
死循环:不断监听不同通道产生的时间
遍历selectionKey事件,可以获取到对应的通道,将通道和通道感兴趣的时间注册到selector中
业务逻辑处理不同的事件。
reactor对nio进行封装:1.当应用想initiation disapatcher注册具体事件处理器时,应用会标识出该事件处理器希望initiation disapatcher在某个事件发生的时候向其通知该事件,该事件与handle关联
2.initiation disapatcher会要求每个事件处理器向其内部传递handle,该handle向操作系统标识了事件处理器
3.当所有的事件处理器注册完毕后,应用会调用handle_event来启动initiation disapatcher的事件循环
这时,initiation disapatcher会将每个事件处理器的handle合并起来,并使用同步事件分离器等待事件的发生,比如说tcp协议层会使用同步事件分离器操作来等待客户端发送的数据到连接的socket handle上
4.当与某个事件对应的handle变成ready状态的时候,比如说 tcp,socket 变为等待状态时,同步事件分离器就会通知initiation disapatcher
5.initiation disapatcher会触发事件处理器的回调方法,从而响应这个处于ready状态的handle,当事件发生的时候,initiation disapatcher会被事件源**handle作为key来寻找分发恰当的事件处理器回调方法
6.initiation disapatcher会回调事件处理器的handle_events回调方法来执行特定于应用的功能,从而响应这个事件,所发生的的事件类型可以作为该方法参数并被该方法内部使用来执行额外的特定于服务的分离和分发
五.netty源码分析:
六.netty的实际应用: