nameserver
创建nameserver
可以看到我们启动 nameserver,就是执行 NamesrvStartup 类的main方法。看起来比较简单,应该就是创建了一个nameserver的控制器然后启动,这样 broker 就可以注册上来了。
首先,我们就看看 createNamesrvController() 方法,他具体是怎么创建的。
我们启动 nameserver,就是执行 runserver.sh 脚本,它里面封装了一堆 jvm 启动命令,其实就和我们自己部署 jar 到服务器上没什么两样。整段命令简化出来差不多就是 java -server -Xms4g -Xmx4g -Xmn2g org.apache.rocketmq.namesrv.NamesrvStartup 这么一个意思。
一开是就是解析我们的 jvm 命令,然后创建了 namesreConfig、NettyServerConfig 对象,然后就基于这两个对象创建完 NamesrvController 直接返回了。其他的代码看起来似乎不重要,就是解析我们的启动参数、然后赋值到config对象而已。
现在我们就来看看,上面创建的 namesreConfig、NettyServerConfig 2个对象是干嘛的。
可以看到,这两个类里面就是一些基本的参数属性,没有其他逻辑了,那么基于 这两个对象 创建的 NamesrvController 应该就是拿到这些配置信息进行初始化,底层我们猜测应该是基于 nettyserver 监听 9876 端口,然后 brocker 进行注册、producer 拉取元数据。
也就是说到此位置,namesrv 已经创建完成,但此时还没有办法对外提供服务,所以我们接下来应该看看 start() 方法了。
启动netty服务器
整个代码也比较简洁,initialize() 完之后,就直接 start() 启动了。一个好的开源框架、主流程一定是比较清晰的。我之前看 eureka-client 代码的时候,那代码是真的一言难尽,层次含糊不清、到处硬编码、逻辑也不严谨。
initialize()
我们看看 NamesrvController 的 initialize() 方法,首先是load()方法,应该是在之前创建 NamesrvController 的时候给的一些配置,然后直接就创建出 NettyRemotingServer 网络服务组件 扔到线程池里面去了,NettyRemotingServer 在创建的时候,构造方法 new 了一个 ServerBootstrap ,他才是netty服务器的核心,所以可以猜测应该它是 对外提供服务的组件。
然后后面代码就是 心跳机制线程了,和我们启动没啥关系,现在我们就可以看看 start()干了些啥了。
start()
我们刚刚初始化的时候,就创建了 NettyRemotingServer ,现在刚好就是启动这个组件,那么我们启动 nameserver 的核心逻辑,必定就是这个无疑了。
@Override public void start() { // 又是一个后台执行的线程,主要是netty网络上的配置,先跳过 this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyServerConfig.getServerWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); } }); // 准备环境 prepareSharableHandlers(); ServerBootstrap childHandler = // 这里都是netty的一些配置。 this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) // 是设置了Netty服务器要监听的端口号,默认就是9876 .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) // 这里是一大堆网络请求处理器。netty服务器收到一个请求,就会一次使用下面处理器来处理请求 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) .addLast(defaultEventExecutorGroup, encoder, // 这是负责编码解码的 new NettyDecoder(), // 这是负责连接空闲管理的 new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), // 这是负责网络连接管理的 connectionManageHandler, // 这是负责关键的网络请求处理的 serverHandler ); } }); if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } try { // 这里就是启动netty服务器了,bind方法就是绑定和监听一个端口号 ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); } if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); }
可以看到 nameserver 的启动并不复杂,画个图简单梳理下。
broker
创建BrokerController
BrokerStartup 启动的时候,先是 createBrokerController() 创建了一个broker 控制器,然后就 start() 启动了,核心就是基于启动命令创建 brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig 4个配置文件,从而构造出 brokerController 并初始化。
public static BrokerController createBrokerController(String[] args) { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); // 设置系统变量什么的 if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { // socket的发送缓冲大小 NettySystemConfig.socketSndbufSize = 131072; } if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) { // socket的发送缓冲大小 NettySystemConfig.socketRcvbufSize = 131072; } try { // 解析启动脚本java命令的 Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); } // 进行 broker netty 配置 final BrokerConfig brokerConfig = new BrokerConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); final NettyClientConfig nettyClientConfig = new NettyClientConfig(); nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE, String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING)))); // 设置监听 10911 端口 nettyServerConfig.setListenPort(10911); final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); // 如果broker是slave的话,就进行此项设置 if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) { int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10; messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio); } if (commandLine.hasOption('c')) { // -c 命令是读取配置文件,覆盖到4个核心配置类中去 String file = commandLine.getOptionValue('c'); if (file != null) { configFile = file; InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); properties2SystemEnv(properties); MixAll.properties2Object(properties, brokerConfig); MixAll.properties2Object(properties, nettyServerConfig); MixAll.properties2Object(properties, nettyClientConfig); MixAll.properties2Object(properties, messageStoreConfig); BrokerPathConfigHelper.setBrokerConfigPath(file); in.close(); } } // 解析配置文件放到 brokerconfig 里面去 MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig); // 没有配置 rocketmq_home 环境变量就直接退出 if (null == brokerConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } // 读取配置的 nameserver 地址 String namesrvAddr = brokerConfig.getNamesrvAddr(); if (null != namesrvAddr) { try { String[] addrArray = namesrvAddr.split(";"); for (String addr : addrArray) { RemotingUtil.string2SocketAddress(addr); } } catch (Exception e) { System.out.printf( "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n", namesrvAddr); System.exit(-3); } } // 对 broker 角色做一些判断,进行个性化处理 switch (messageStoreConfig.getBrokerRole()) { case ASYNC_MASTER: case SYNC_MASTER: brokerConfig.setBrokerId(MixAll.MASTER_ID); break; case SLAVE: if (brokerConfig.getBrokerId() <= 0) { System.out.printf("Slave's brokerId must be > 0"); System.exit(-3); } break; default: break; } if (messageStoreConfig.isEnableDLegerCommitLog()) { // 如果是基于 dleger 管理主从和 commitlog 的话,brokerid=-1 brokerConfig.setBrokerId(-1); } // 设置 ha 监听端口 messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1); // log相关 LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml"); // 解析启动命令 if (commandLine.hasOption('p')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME); MixAll.printObjectProperties(console, brokerConfig); MixAll.printObjectProperties(console, nettyServerConfig); MixAll.printObjectProperties(console, nettyClientConfig); MixAll.printObjectProperties(console, messageStoreConfig); System.exit(0); } else if (commandLine.hasOption('m')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME); MixAll.printObjectProperties(console, brokerConfig, true); MixAll.printObjectProperties(console, nettyServerConfig, true); MixAll.printObjectProperties(console, nettyClientConfig, true); MixAll.printObjectProperties(console, messageStoreConfig, true); System.exit(0); } // 打印 broker 配置信息 log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); MixAll.printObjectProperties(log, brokerConfig); MixAll.printObjectProperties(log, nettyServerConfig); MixAll.printObjectProperties(log, nettyClientConfig); MixAll.printObjectProperties(log, messageStoreConfig); // 创建 brockercontroller final BrokerController controller = new BrokerController( brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig); // remember all configs to prevent discard controller.getConfiguration().registerConfig(properties); // 初始化 boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } // 关闭回调函数 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { private volatile boolean hasShutdown = false; private AtomicInteger shutdownTimes = new AtomicInteger(0); @Override public void run() { synchronized (this) { log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet()); if (!this.hasShutdown) { this.hasShutdown = true; long beginTime = System.currentTimeMillis(); controller.shutdown(); long consumingTimeTotal = System.currentTimeMillis() - beginTime; log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal); } } } }, "ShutdownHook")); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }