前言
从RocketMQ启动来分析,namesrv的功能就是提供broker的注册以及client通过topic来获取routing的信息,从broker获取相应的操作过程,简单的介绍还是从源码的角度来分析,本节主要是对namesrv的主要功能,包括默认请求处理方式,集群请求处理方式,以及路由信息管理,键值对配置管理等来入手,同样也是从官网提供的mockito用例入手。
KVConfig包
namesrv的功能其中之一就是提供broker的注册以及路由的管理,而broker的注册信息就从这个包里体现出关联关系,譬如下面从kvconfig包来分析定义的KVConfig的包装类信息,从源码来看是提供了一个包装类,这个类同样是实现了定义的序列化接口RemotingSerializable
从mockito的用例分析,wraper类针对namesrv的注册功能,提供了键值对的方式,约定是broker-xxxx,topic-xxxx等方式提供wraper之后的序列化处理
public class KVConfigSerializeWrapperTest {
private KVConfigSerializeWrapper kvConfigSerializeWrapper;
@Before
public void setup() throws Exception {
kvConfigSerializeWrapper = new KVConfigSerializeWrapper();
}
@Test
public void testEncodeAndDecode() {
HashMap<String, HashMap<String, String>> result = new HashMap<>();
HashMap<String, String> kvs = new HashMap<>();
kvs.put("broker-name", "default-broker");
kvs.put("topic-name", "default-topic");
kvs.put("cid", "default-consumer-name");
result.put(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, kvs);
kvConfigSerializeWrapper.setConfigTable(result);
byte[] serializeByte = KVConfigSerializeWrapper.encode(kvConfigSerializeWrapper);
assertThat(serializeByte).isNotNull();
KVConfigSerializeWrapper deserializeObject = KVConfigSerializeWrapper.decode(serializeByte, KVConfigSerializeWrapper.class);
assertThat(deserializeObject.getConfigTable()).containsKey(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
assertThat(deserializeObject.getConfigTable().get(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG).get("broker-name")).isEqualTo("default-broker");
assertThat(deserializeObject.getConfigTable().get(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG).get("topic-name")).isEqualTo("default-topic");
assertThat(deserializeObject.getConfigTable().get(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG).get("cid")).isEqualTo("default-consumer-name");
}
}
而通过对KVConfigWraper类的源码分析,下面就是针对这个包装类对KVConfigManager类进行应用。通过架构图,该类是单独的应用类,没有任何继承实现关系,而提供了针对KVConfig的信息的获取,提供了CRUD的操作方式,值得注意的是这里的所有的操作都使用了AQS性质的reentranlock来保证线程安全。下面就从命名空间获取的KV信息方法来看看针对数据安全方面是如何保证的,
public byte[] getKVListByNamespace(final String namespace) {
try {
this.lock.readLock().lockInterruptibly();
try {
HashMap<String, String> kvTable = this.configTable.get(namespace);
if (null != kvTable) {
KVTable table = new KVTable();
table.setTable(kvTable);
return table.encode();
}
} finally {
this.lock.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("getKVListByNamespace InterruptedException", e);
}
return null;
}
routingInfo包
提供了KV方式把broker注册到namesrv之外,还提供了针对client的请求通过topic得到routing的信息,而通过架构图这里的BrokerHouseKeepingService实现了ChannelEvenListener,而ChannelEvenListener就是针对Netty的Channel做了扩展,实现通信。
通过houseKeeping的源码看出来核心的逻辑都在NamesrvController的实现
public class BrokerHousekeepingService implements ChannelEventListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvController namesrvController;
public BrokerHousekeepingService(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
}
@Override
public void onChannelConnect(String remoteAddr, Channel channel) {
}
@Override
public void onChannelClose(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelException(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
}
而重点分析namesrvController,这个类也是没有任何的继承实现,使用了Excutor扩展的类,实现定时任务,对broker的心跳的检查
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
以及下面将要介绍的processor包的方法封装,对请求注册的类型进行分类注册到namesrv上。
private void registerProcessor() {
if (namesrvConfig.isClusterTest()) {
this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
this.remotingExecutor);
} else {
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
}
}
这里多提一个点就是,实现ssl安全加密也是使用了netty的sslHelper实现,具体的netty源码暂时不在本节提供,后续netty源码分析的时候再综合RocketMQ的源码引用再作为例子来详细说
public void loadSslContext() {
TlsMode tlsMode = TlsSystemConfig.tlsMode;
log.info("Server is running in TLS {} mode", tlsMode.getName());
if (tlsMode != TlsMode.DISABLED) {
try {
sslContext = TlsHelper.buildSslContext(false);
log.info("SSLContext created for server");
} catch (CertificateException e) {
log.error("Failed to create SSLContext for server", e);
} catch (IOException e) {
log.error("Failed to create SSLContext for server", e);
}
}
}
processor包
namesrv提供了KV的配置管理包之外,还提供了针对请求的不同类型进行处理,分别从针对集群模式的,以及默认模式的方式,值得注意的是这里的requestProcessor处理都是长连接方式处理,通过Netty的通信实现,下面针对请求的方式进行分析
而DefaultRequestProcessor的核心方法是processRequest(ChannelHandlerContext ctx, RemotingCommand request),这个方法封装了前面章节提到的common模块的RemotingHelper类的封装。
public static String parseChannelRemoteAddr(final Channel channel) {
if (null == channel) {
return "";
}
SocketAddress remote = channel.remoteAddress();
final String addr = remote != null ? remote.toString() : "";
if (addr.length() > 0) {
int index = addr.lastIndexOf("/");
if (index >= 0) {
return addr.substring(index + 1);
}
return addr;
}
return "";
}
而集群模式的请求核心的代码逻辑如下TopicRouteData 的获取过程。
@Override
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
//这里调用controller的方法获取topic的具体数据
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if (topicRouteData != null) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
} else {
try {
topicRouteData = adminExt.examineTopicRouteInfo(requestHeader.getTopic());
} catch (Exception e) {
log.info("get route info by topic from product environment failed. envName={},", productEnvName);
}
}
if (topicRouteData != null) {
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
总结
整个namesrv的模块还涉及数据安全方面的问题,使用了线程的控制countDownLatch来控制,这里countdown的源码也是非常有参考价值分别使用了JDK的unsafe类以及AQS的知识,后续有时间再补充这块的内容。