Elasticsearch作为分布式集群,客户端到服务端,节点与节点间通信有TCP和Http通信协议,底层实现为Netty框架。不了解Netty的同学先了解Netty基本原理及使用https://www.cnblogs.com/zhxdxf/articles/10340791.html。
1.关于启动
HTTP请求仅提供服务端响应,节点启动时启动HTTP服务端,TCP请求时ES的节点即作为服务端,又作为客户端,需要启动Transport的服务端、客户端服务。节点启动请参考ElasticSearch源码之启动类
injector.getInstance(HttpServerTransport.class).start();//提供HttpServerTransport服务的启动
最终调用Netty4HttpServerTransport中的doStart()方法
serverBootstrap = new ServerBootstrap();//即Netty的服务端启动方式
serverBootstrap.childHandler(configureServerChannelHandler());//添加服务端接消息的处理类
this.requestHandler = new Netty4HttpRequestHandler(transport, detailedErrorsEnabled, threadContext);
ch.pipeline().addLast("handler", requestHandler);
protected void doStart() { boolean success = false; try { this.serverOpenChannels = new Netty4OpenChannelsHandler(logger); serverBootstrap = new ServerBootstrap(); if (blockingServer) { serverBootstrap.group(new OioEventLoopGroup(workerCount, daemonThreadFactory(settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX))); serverBootstrap.channel(OioServerSocketChannel.class); } else { serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX))); serverBootstrap.channel(NioServerSocketChannel.class); } serverBootstrap.childHandler(configureServerChannelHandler()); serverBootstrap.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings)); serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings)); final ByteSizeValue tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings); if (tcpSendBufferSize.getBytes() > 0) { serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes())); } final ByteSizeValue tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings); if (tcpReceiveBufferSize.getBytes() > 0) { serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes())); } serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator); serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator); final boolean reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings); serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress); serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress); this.boundAddress = createBoundHttpAddress(); if (logger.isInfoEnabled()) { logger.info("{}", boundAddress); } success = true; } finally { if (success == false) { doStop(); // otherwise we leak threads since we never moved to started } } }