1.------------------------webSocketServer 主要是用于启动netty服务------------------
/**
* 启动WebSocketServer
*/
@Component
public final class WebSocketServer {
static final boolean SSL = System.getProperty("ssl") != null;
public void start(Integer port) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new WebSocketServerInitializer(sslCtx));
Channel ch = b.bind(port).sync().channel();
System.out.println("Open your web browser and navigate to " +
(SSL? "https" : "http") + "://127.0.0.1:" + port + '/');
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
2.---------------------------处理http请求 用于处理http请求升级为ws
public class WebSocketIndexPageHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final String websocketPath;
public WebSocketIndexPageHandler(String websocketPath) {
this.websocketPath = websocketPath;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
// Handle a bad request.
if (!req.decoderResult().isSuccess()) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(req.protocolVersion(), BAD_REQUEST,
ctx.alloc().buffer(0)));
return;
}
// Allow only GET methods.
if (!HttpMethod.GET.equals(req.method())) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(req.protocolVersion(), FORBIDDEN,
ctx.alloc().buffer(0)));
return;
}
// Send the index page
if ("/".equals(req.uri()) || "/index.html".equals(req.uri())) {
String webSocketLocation = getWebSocketLocation(ctx.pipeline(), req, websocketPath);
ByteBuf content = WebSocketServerIndexPage.getContent(webSocketLocation);
FullHttpResponse res = new DefaultFullHttpResponse(req.protocolVersion(), OK, content);
res.headers().set(CONTENT_TYPE, "text/html; charset=UTF-8");
HttpUtil.setContentLength(res, content.readableBytes());
sendHttpResponse(ctx, req, res);
} else {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(req.protocolVersion(), NOT_FOUND,
ctx.alloc().buffer(0)));
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
// Generate an error page if response getStatus code is not OK (200).
HttpResponseStatus responseStatus = res.status();
if (responseStatus.code() != 200) {
ByteBufUtil.writeUtf8(res.content(), responseStatus.toString());
HttpUtil.setContentLength(res, res.content().readableBytes());
}
// Send the response and close the connection if necessary.
boolean keepAlive = HttpUtil.isKeepAlive(req) && responseStatus.code() == 200;
HttpUtil.setKeepAlive(res, keepAlive);
ChannelFuture future = ctx.writeAndFlush(res);
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
private static String getWebSocketLocation(ChannelPipeline cp, HttpRequest req, String path) {
String protocol = "ws";
if (cp.get(SslHandler.class) != null) {
// SSL in use so use Secure WebSockets
protocol = "wss";
}
return protocol + "://" + req.headers().get(HttpHeaderNames.HOST) + path;
}
}
3.------------------------静态 页面
package com.netty.server.websocketServer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
/**
* 返回一个聊天页面
*/
public final class WebSocketServerIndexPage {
private static final String NEWLINE = "\r\n";
public static ByteBuf getContent(String webSocketLocation) {
return Unpooled.copiedBuffer(
"<html><head><title>Web Socket Test</title></head>" + NEWLINE +
"<body>" + NEWLINE +
"<script type=\"text/javascript\">" + NEWLINE +
"var socket;" + NEWLINE +
"if (!window.WebSocket) {" + NEWLINE +
" window.WebSocket = window.MozWebSocket;" + NEWLINE +
'}' + NEWLINE +
"if (window.WebSocket) {" + NEWLINE +
" socket = new WebSocket(\"" + webSocketLocation + "\");" + NEWLINE +
" socket.onmessage = function(event) {" + NEWLINE +
" var ta = document.getElementById('responseText');" + NEWLINE +
" ta.value = ta.value + '\\n' + event.data" + NEWLINE +
" };" + NEWLINE +
" socket.onopen = function(event) {" + NEWLINE +
" var ta = document.getElementById('responseText');" + NEWLINE +
" ta.value = \"Web Socket opened!\";" + NEWLINE +
" };" + NEWLINE +
" socket.onclose = function(event) {" + NEWLINE +
" var ta = document.getElementById('responseText');" + NEWLINE +
" ta.value = ta.value + \"Web Socket closed\"; " + NEWLINE +
" };" + NEWLINE +
"} else {" + NEWLINE +
" alert(\"Your browser does not support Web Socket.\");" + NEWLINE +
'}' + NEWLINE +
NEWLINE +
"function send(message) {" + NEWLINE +
" if (!window.WebSocket) { return; }" + NEWLINE +
" if (socket.readyState == WebSocket.OPEN) {" + NEWLINE +
" socket.send(message);" + NEWLINE +
" } else {" + NEWLINE +
" alert(\"The socket is not open.\");" + NEWLINE +
" }" + NEWLINE +
'}' + NEWLINE +
"</script>" + NEWLINE +
"<form οnsubmit=\"return false;\">" + NEWLINE +
"<input type=\"text\" name=\"message\" value=\"Hello, World!\" style=\"width:300px;height:100px;border-color:#6699FF\"/>" +
"<input type=\"button\" value=\"Send\"" + NEWLINE +
" οnclick=\"send(this.form.message.value)\" style=\"width:100px;height:100px;background:#66FF66\" />" + NEWLINE +
"<h1>Output</h1>" + NEWLINE +
"<textarea id=\"responseText\" style=\"width:400px;height:500px;border:20px;solid:#378888;border-color:#6699FF;background:#66FF66\"></textarea>" + NEWLINE +
"</form>" + NEWLINE +
"</body>" + NEWLINE +
"</html>" + NEWLINE, CharsetUtil.US_ASCII);
}
private WebSocketServerIndexPage() {
// Unused
}
}
4.----------------------------------具体的处理消息handler
/**
*
* @Description: 处理消息的handler
* TextWebSocketFrame: 在netty中,是用于为websocket专门处理文本的对象,frame是消息的载体
*/
public class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
// 用于记录和管理所有客户端的channle
private static ChannelGroup clients =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private static ConcurrentHashMap<Object,Object> channels=new ConcurrentHashMap<>();
private static Integer count =0;
// 定义接收到消息的操作
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
// ping and pong frames already handled
if (frame instanceof TextWebSocketFrame) {
// Send the uppercase string back.
String request = ((TextWebSocketFrame) frame).text();
StringBuilder stringBuilder=new StringBuilder();;
int index=(int)channels.get(ctx.channel().id());
stringBuilder.append(RandomUtil.map.get(index)+": ");
stringBuilder.append(request);
for(Channel channel:clients){
channel.writeAndFlush(new TextWebSocketFrame(stringBuilder.toString().toUpperCase(Locale.US)));
}
} else {
String message = "unsupported frame type: " + frame.getClass().getName();
throw new UnsupportedOperationException(message);
}
}
/**
* 当客户端连接服务端之后(打开连接)
* 获取客户端的channle,并且放到ChannelGroup中去进行管理
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx){
try {
count++;
System.out.println("用户加入事件 "+ctx.channel().id()+" 第"+count+"个用户");
clients.add(ctx.channel());
int index=RandomUtil.getIndex();
channels.put(ctx.channel().id(),index);
}catch (Exception e){
e.printStackTrace();
System.out.println(e.getMessage());
}
}
//定义用户离线的操作
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
System.out.println("用户:"+incoming+"掉线");
}
//定义用户异常的操作
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
Channel incoming = ctx.channel();
System.out.println("用户:"+incoming.id()+"异常");
cause.printStackTrace();
ctx.close();
}
}
5.--------------------------------初始化Initializer
public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
private static final String WEBSOCKET_PATH = "/websocket";
private final SslContext sslCtx;
public WebSocketServerInitializer(SslContext sslCtx) {
this.sslCtx = sslCtx;
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
//// websocket 基于http协议,所以要有http编解码器
pipeline.addLast(new HttpServerCodec());
//java.lang.NoSuchMethodError:
// io.netty.util.internal.AppendableCharSequence.setLength(I)V
//该异常是因为版本冲突导致
pipeline.addLast(new HttpObjectAggregator(1024*64));
// WebSocket数据压缩
pipeline.addLast(new WebSocketServerCompressionHandler());
// 协议包长度限制
pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null, true));
//这个仅限demo使用 是用来返回初始化html页面的
pipeline.addLast(new WebSocketIndexPageHandler(WEBSOCKET_PATH));
/* pipeline.addLast(new HttpRequestHandler("/ws"));*/
//具体的消息处理逻辑
pipeline.addLast(new WebSocketFrameHandler());
}
}
6.----------------------util
package com.netty.server.websocketServer;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
public class RandomUtil {
static Map<Integer,String> map= new ConcurrentHashMap();
static Set<Integer> set=new TreeSet<>();
public static void getOne(){
Random random=new Random();
for(int i=0;i<10;i++){
set.add(random.nextInt(10));
}
if(set.size()<10){
getOne();
}
}
static {
getOne();
map.put(0,"尼古拉斯儿");
map.put(1,"张三");
map.put(2,"李四");
map.put(3,"王二麻子");
map.put(4,"马六");
map.put(5,"申公豹");
map.put(6,"大麻花");
map.put(7,"小垃圾");
map.put(8,"鸡腿");
map.put(9,"鸭腿");
}
public static int getIndex(){
int re=0;
if(!set.isEmpty()){
Iterator itr=set.iterator();
re=(int)itr.next();
set.remove(re);
return re;
}
return re;
}
}
7.-----------------启动类
@SpringBootApplication
@ComponentScan("com.netty")
public class NettyServerApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(NettyServerApplication.class, args);
WebSocketServer webSocketServer=context.getBean(WebSocketServer.class);
try {
webSocketServer.start(8888);
} catch (Exception e) {
e.printStackTrace();
}
}
}
效果图: