Elasticsearch作为分布式集群,客户端到服务端,节点与节点间通信有TCP和Http通信协议,底层实现为Netty框架。不了解Netty的同学先了解Netty基本原理及使用https://www.cnblogs.com/zhxdxf/articles/10340791.html

1.关于启动

HTTP请求仅提供服务端响应,节点启动时启动HTTP服务端,TCP请求时ES的节点即作为服务端,又作为客户端,需要启动Transport的服务端、客户端服务。节点启动请参考ElasticSearch源码之启动类

injector.getInstance(HttpServerTransport.class).start();//提供HttpServerTransport服务的启动

最终调用Netty4HttpServerTransport中的doStart()方法
serverBootstrap = new ServerBootstrap();//即Netty的服务端启动方式
serverBootstrap.childHandler(configureServerChannelHandler());//添加服务端接消息的处理类
    this.requestHandler = new Netty4HttpRequestHandler(transport, detailedErrorsEnabled, threadContext);
    ch.pipeline().addLast("handler", requestHandler);
    protected void doStart() {
        boolean success = false;
        try {
            this.serverOpenChannels = new Netty4OpenChannelsHandler(logger);

            serverBootstrap = new ServerBootstrap();
            if (blockingServer) {
                serverBootstrap.group(new OioEventLoopGroup(workerCount, daemonThreadFactory(settings,
                    HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
                serverBootstrap.channel(OioServerSocketChannel.class);
            } else {
                serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings,
                    HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
                serverBootstrap.channel(NioServerSocketChannel.class);
            }

            serverBootstrap.childHandler(configureServerChannelHandler());

            serverBootstrap.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings));
            serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings));

            final ByteSizeValue tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings);
            if (tcpSendBufferSize.getBytes() > 0) {
                serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes()));
            }

            final ByteSizeValue tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings);
            if (tcpReceiveBufferSize.getBytes() > 0) {
                serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes()));
            }

            serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
            serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);

            final boolean reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);
            serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
            serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);

            this.boundAddress = createBoundHttpAddress();
            if (logger.isInfoEnabled()) {
                logger.info("{}", boundAddress);
            }
            success = true;
        } finally {
            if (success == false) {
                doStop(); // otherwise we leak threads since we never moved to started
            }
        }
    }
View Code

相关文章: