简言:
- 首先我们得先弄清楚websocket和tcpSocket的区别。websocket也是基于tcp的应用层协议,只是在传统的socket上进行了封装。
- 然后我们要知道netty的handle支持动态增删。
综上所述,相信脑海里已经有对应的方案了。那就是动态设置编解码处理器。
关键代码如下:
// 1.socket方式服务 // 设置N秒没有读到数据,则触发一个READER_IDLE事件。 pipeline.addLast(new IdleStateHandler(readerIdleTime,0,0, TimeUnit.SECONDS)); pipeline.addLast("active",new ChannelActiveHandle()); pipeline.addLast("socketChoose",new SocketChooseHandle()); //tcpsocket编码解码handle,如果是websocket链接,会将其删除 pipeline.addLast("lengthEncode",new LengthFieldPrepender(4, false)); pipeline.addLast("lengthDecoder",new LengthFieldBasedFrameDecoder(2000, 0, 4,0, 4)); pipeline.addLast(bytebufToByteHandle); //因为接收类型的泛型不对,所以在websocket握手的时候不会进入该handle //此handle为最后的socket消息分解,web和tcp通用 pipeline.addLast("byteToBuf",byteToByteBufHandle); pipeline.addLast("protocolResolve",protocolResolveHandle);
/** * 协议初始化解码器. * * 用来判定实际使用什么协议.</b> * */ public class SocketChooseHandle extends ByteToMessageDecoder { /** 默认暗号长度为23 */ private static final int MAX_LENGTH = 23; /** WebSocket握手的协议前缀 */ private static final String WEBSOCKET_PREFIX = "GET /"; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { String protocol = getBufStart(in); if (protocol.startsWith(WEBSOCKET_PREFIX)) { SpringApplicationContextHolder.getSpringBeanForClass(PipelineAdd.class).websocketAdd(ctx); ctx.pipeline().remove(LengthFieldBasedFrameDecoder.class); ctx.pipeline().remove(LengthFieldPrepender.class); ctx.pipeline().remove(BytebufToByteHandle.class); } in.resetReaderIndex(); ctx.pipeline().remove(this.getClass()); } private String getBufStart(ByteBuf in){ int length = in.readableBytes(); if (length > MAX_LENGTH) { length = MAX_LENGTH; } // 标记读位置 in.markReaderIndex(); byte[] content = new byte[length]; in.readBytes(content); return new String(content); } }
public void websocketAdd(ChannelHandlerContext ctx){ // HttpServerCodec:将请求和应答消息解码为HTTP消息 ctx.pipeline().addBefore("byteToBuf","http-codec",new HttpServerCodec()); // HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息 ctx.pipeline().addBefore("byteToBuf","aggregator",new HttpObjectAggregator(65535)); // ChunkedWriteHandler:向客户端发送HTML5文件 ctx.pipeline().addBefore("byteToBuf","http-chunked",new ChunkedWriteHandler()); ctx.pipeline().addBefore("byteToBuf","WebSocketAggregator",new WebSocketFrameAggregator(65535)); // 在管道中添加我们自己的接收数据实现方法 ctx.pipeline().addBefore("byteToBuf","ws-handShake",wsHandShakeServerHandle); // 后续直接走消息处理 ctx.pipeline().addBefore("byteToBuf","wsPack",wsPacketHandle); // 编码。将通用byteBuf编码成binaryWebSocketFrame.通过前面的编码器 ctx.pipeline().addBefore("byteToBuf","bufToFrame",bytebufToBinaryFrameHandle); }
说明:
首先我们先添加好SocketChooseHandle(),这是我们的handle判断处理器。如果判断协议是以GET /开头的话,那么必定是websocket的连接握手。
而又因为socket连接是不进SocketChooseHandle的破方法的,导致我们必须在初始化的时候就把socket的处理写在后面。
继续说websocket的处理。
当我们检测到时websocket连接的时候,我们会移除掉socket的编解码处理器,然后再移除自己。(下次进来就直接处理websocketframe了,所以不需要再次进行这个判断处理器)
然后websocket顺利的进入动态添加的编码器,进行websocket的握手handshake。然后进行下一轮通信。
反之,如果是tcpsocket连接,会直接走tcpSocketHandle处理。这里我们用的LengthFieldBasedFrameDecoder长度占包粘包处理器。
protocolResolveHandle是我们的业务处理器handle。
详细代码请参考nafos-network: https://gitee.com/huangxinyu/BC-NETTYGO