石墨文档:https://shimo.im/docs/tHwJJcvKl2AIiCZD/
(二期)18、开源t-io项目解读
t-io:
websocket
- 同步
- 异步
- 阻塞
- 非阻塞
- 一个连接一个线程
- 一个请求一个线程
**
Java对BIO、NIO、AIO的支持:
- Java BIO : 同步并阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然可以通过线程池机制改善。
- Java NIO : 同步非阻塞,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。
- Java AIO(NIO.2) : 异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理
BIO、NIO、AIO适用场景分析:
- BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序直观简单易理解。
- NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。
- AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。
- ChannelContext(通道上下文)
- 每一个 tcp 连接的建立都会产生一个 ChannelContext 对象
- ServerChannelContext
- ChannelContext 的子类,当用 tio 作 tcp 服务器时,业务层接触的是这个类的实例。
- ClientChannelContext
- ChannelContext 的子类,当用 tio 作 tcp 客户端时,业务层接触的是这个类的实例
- GroupContext(服务配置与维护)
- GroupContext 就是用来配置线程池、确定监听端口,维护客户端各种数据等的
- ClientGroupContext
- ServerGroupContext
- AioHandler(消息处理接口)
- 处理消息的核心接口,它有两个子接口
- ClientAioHandler
- ServerAioHandler
- AioListener(通道监听者)
- 处理事件监听的核心接口,它有两个子接口,
- ClientAioListener
- ServerAioListener
- Packet(应用层数据包)
- TCP 层过来的数据,都会被 tio 要求解码成 Packet 对象,应用都需要继承这个类,从而实现自己的业务 数据包。
- AioServer(tio 服务端入口类)
- AioClient(tio 客户端入口类)
- ObjWithLock(自带读写锁的对象)
- 是一个自带了一把(读写)锁的普通对象(一般是集合对象),每当要对 这个对象进行同步安全操作(并发下对集合进行遍历或对集合对象进行元素修改删除增加)时,就得用这个 锁。
t-io是基于tcp层协议的一个网络框架,所以在应用层与tcp传输层之间设计到一个数据的编码与解码问题,t-io让我们能自定义数据协议,所以需要我们自己手动去编码解码过程。
- git项目地址
https://gitee.com/tywo45/tio-showcase
- 分为server和client工程,server和client共用common工程
- 服务端和客户端的消息协议比较简单,消息头为4个字节,用以表示消息体的长度,消息体为一个字符串的byte[]
- 服务端先启动,监听6789端口
- 客户端连接到服务端后,会主动向服务器发送一条消息
- 服务器收到消息后会回应一条消息
- 之后,框架层会自动从客户端发心跳到服务器,服务器也会检测心跳有没有超时(这些事都是框架做的,业务层只需要配一个心跳超时参数即可)
- 框架层会在断链后自动重连(这些事都是框架做的,业务层只需要配一个重连配置对象即可)
- 导入核心包
<dependency>
<groupId>org.t-io</groupId>
<artifactId>tio-core</artifactId>
</dependency>
- HelloServerStarter
- HelloServerAioHandler
- HelloPacket
- Const
- HelloClientStarter
- HelloClientAioHandler
- idea请安装PlantUML intergration插件
(初始化服务器)
(客户端与服务端通讯流程)
- git项目地址
https://gitee.com/tywo45/tio-showcase
- #processCommand
LoginReqBody loginReqBody = new LoginReqBody();
loginReqBody.setLoginname(loginname);
loginReqBody.setPassword(password);
ShowcasePacket reqPacket = new ShowcasePacket();
#这里指定消息类型
reqPacket.setType(Type.LOGIN_REQ);
reqPacket.setBody(Json.toJson(loginReqBody).getBytes(ShowcasePacket.CHARSET));
Tio.send(clientChannelContext, reqPacket);
- LoginReqBody
- ShowcasePacket
- clientChannelContext
- #handler
ShowcasePacket showcasePacket = (ShowcasePacket) packet;
#获取消息类型
Byte type = showcasePacket.getType();
#根据消息类型找到对应的消息处理类
AbsShowcaseBsHandler<?> showcaseBsHandler = handlerMap.get(type);
if (showcaseBsHandler == null) {
log.error("{}, 找不到处理类,type:{}", channelContext, type);
return;
}
#执行消息处理。消息处理类必须继承AbsShowcaseBsHandler
showcaseBsHandler.handler(showcasePacket, channelContext);
- handlerMap
- AbsShowcaseBsHandler
private static Map<Byte, AbsShowcaseBsHandler<?>> handlerMap = new HashMap<>();
static {
#把消息类型与消息处理类映射起来
handlerMap.put(Type.GROUP_MSG_REQ, new GroupMsgReqHandler());
handlerMap.put(Type.HEART_BEAT_REQ, new HeartbeatReqHandler());
handlerMap.put(Type.JOIN_GROUP_REQ, new JoinGroupReqHandler());
handlerMap.put(Type.LOGIN_REQ, new LoginReqHandler());
handlerMap.put(Type.P2P_REQ, new P2PReqHandler());
}
- #handler
log.info("收到点对点请求消息:{}", Json.toJson(bsBody));
ShowcaseSessionContext showcaseSessionContext = (ShowcaseSessionContext) channelContext.getAttribute();
P2PRespBody p2pRespBody = new P2PRespBody();
p2pRespBody.setFromUserid(showcaseSessionContext.getUserid());
p2pRespBody.setText(bsBody.getText());
ShowcasePacket respPacket = new ShowcasePacket();
respPacket.setType(Type.P2P_RESP);
respPacket.setBody(Json.toJson(p2pRespBody).getBytes(ShowcasePacket.CHARSET));
Tio.sendToUser(channelContext.groupContext, bsBody.getToUserid(), respPacket);
项目集成:
<dependency>
<groupId>org.t-io</groupId>
<artifactId>tio-websocket-server</artifactId>
<version>0.0.5-tio-websocket</version>
</dependency>
代码结构
事件定义
new Object() {
@Subscribe
public void lister(Integer integer) {
System.out.printf("%d from int%n", integer);
}
}
事件发布
//定义事件
final EventBus eventBus = new EventBus();
//注册事件
eventBus.register(new Object() {
//使用@Subscribe说明订阅事件处理方法
@Subscribe
public void lister(Integer integer) {
System.out.printf("%s from int%n", integer);
}
@Subscribe
public void lister(Number integer) {
System.out.printf("%s from Number%n", integer);
}
@Subscribe
public void lister(Long integer) {
System.out.printf("%s from long%n", integer);
}
});
//发布事件
eventBus.post(1);
eventBus.post(1L);
项目的而运用
主要处理事件
关键类:
- :封装了事件的监听注册,以及发布动作
- :发布的内容封装类,包含消息类型和消息内容字段
- 事件处理抽象类,具体处理器需要继承这个重写handler()方法
- 添加好友成功通知处理类。
- 消息类型常量
调用:
- #handleFriendApply:好友同意好友请求之后发布事件
逻辑:
- #pushAddFriendMessage
/**
* 添加好友成功之后向对方推送消息
* */
public static void pushAddFriendMessage(long applyid){
if(applyid==0){
return;
}
Apply apply = applyService.getApply(applyid);
ChannelContext channelContext = getChannelContext(""+apply.getUid());
//先判断是否在线,再去查询数据库,减少查询次数
if (channelContext != null && !channelContext.isClosed()) {
LayimToClientAddFriendMsgBody body = new LayimToClientAddFriendMsgBody();
User user = getUserService().getUser(apply.getToid());
if (user==null){return;}
//对方分组ID
body.setGroupid(apply.getGroup());
//当前用户的基本信息,用于调用layim.addList
body.setAvatar(user.getAvatar());
body.setId(user.getId());
body.setSign(user.getSign());
body.setType("friend");
body.setUsername(user.getUserName());
push(channelContext, body);
}
}
- #push
/**
* 服务端主动推送消息
* */
private static void push(ChannelContext channelContext,Object msg) {
try {
WsResponse response = BodyConvert.getInstance().convertToTextResponse(msg);
Aio.send(channelContext, response);
}catch (IOException ex){
}
}
- 登录功能
- 单聊功能
- 群聊功能
- 其他自定义消息提醒功能
- 等等。。。。
- templates/index.html
layim.config({
//初始化接口
init: {
url: '/layim/base'
}
//查看群员接口
,members: {
url: '/layim/members'
}
//上传图片接口
,uploadImage: {url: '/upload/file'}
//上传文件接口
,uploadFile: {url: '/upload/file'}
,isAudio: true //开启聊天工具栏音频
,isVideo: true //开启聊天工具栏视频
,initSkin: '5.jpg' //1-5 设置初始背景
,notice: true //是否开启桌面消息提醒,默认false
,msgbox: '/layim/msgbox'
,find: layui.cache.dir + 'css/modules/layim/html/find.html' //发现页面地址,若不开启,剔除该项即可
,chatLog: layui.cache.dir + 'css/modules/layim/html/chatLog.html' //聊天记录页面地址,若不开启,剔除该项即可
});
socket.config({
log:true,
token:'/layim/token',
server:'ws://127.0.0.1:8888'
});
- templates/layim/msgbox.html
- META-INF/spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration= com.fyp.layim.im.server.LayimServerAutoConfig
- com.fyp.layim.im.server.LayimServerAutoConfig:自动装载的配置类
- com.fyp.layim.im.server.config.LayimServerConfig:服务的ip、端口、心跳时间等基本配置
- com.fyp.layim.im.server.LayimWebsocketStarter:初始化配置,启动t-io服务,其中配置初始化和启动都是委托给com.fyp.layim.im.server.LayimServerStarter完成。
- com.fyp.layim.im.server.LayimServerStarter:根据配置初始化serverGroupContext、初始化消息处理器
//初始化t-io的serverGroupContext
//还有消息处理器与消息类型的映射关系
public LayimServerStarter(LayimServerConfig wsServerConfig, IWsMsgHandler wsMsgHandler, TioUuid tioUuid, SynThreadPoolExecutor tioExecutor, ThreadPoolExecutor groupExecutor) throws Exception {
this.layimServerConfig = wsServerConfig;
this.wsMsgHandler = wsMsgHandler;
layimServerAioHandler = new LayimServerAioHandler(wsServerConfig, wsMsgHandler);
layimServerAioListener = new LayimServerAioListener();
serverGroupContext = new ServerGroupContext(layimServerAioHandler, layimServerAioListener, tioExecutor, groupExecutor);
//心跳时间,暂时设置为0
serverGroupContext.setHeartbeatTimeout(wsServerConfig.getHeartBeatTimeout());
serverGroupContext.setName("Tio Websocket Server for LayIM");
aioServer = new AioServer(serverGroupContext);
serverGroupContext.setTioUuid(tioUuid);
//initSsl(serverGroupContext);
//初始化消息处理器
LayimMsgProcessorManager.init();
}
- com.fyp.layim.im.server.handler.LayimServerAioHandler:继承ServerAioHandler,完成消息的解码、编码、消息处理过程
- com.fyp.layim.im.server.listener.LayimServerAioListener:继承ServerAioListener,完成事件监听
- #process
SetWithLock<ChannelContext> checkChannelContexts =
Aio.getChannelContextsByUserid(channelContext.getGroupContext(),body.getId());
- #handleHandshakeUserInfo
private HttpResponse handleHandshakeUserInfo(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
UserService userService = getUserService();
//增加token验证方法
String path = httpRequest.getRequestLine().getPath();
String token = URLDecoder.decode(path.substring(1),"utf-8");
String userId = TokenVerify.IsValid(token);
if (userId == null) {
//没有token 未授权
httpResponse.setStatus(HttpResponseStatus.C401);
} else {
long uid = Long.parseLong(userId);
//解析token
LayimContextUserInfo userInfo = userService.getContextUserInfo(uid);
if (userInfo == null) {
//没有找到用户
httpResponse.setStatus(HttpResponseStatus.C404);
} else {
channelContext.setAttribute(userId, userInfo.getContextUser());
//绑定用户ID
Aio.bindUser(channelContext, userId);
//绑定用户群组
List<String> groupIds = userInfo.getGroupIds();
//绑定用户群信息
if (groupIds != null) {
groupIds.forEach(groupId -> Aio.bindGroup(channelContext, groupId));
}
//通知所有好友本人上线了
notify(channelContext,true);
}
}
return httpResponse;
}