前言

从RocketMQ启动来分析,namesrv的功能就是提供broker的注册以及client通过topic来获取routing的信息,从broker获取相应的操作过程,简单的介绍还是从源码的角度来分析,本节主要是对namesrv的主要功能,包括默认请求处理方式,集群请求处理方式,以及路由信息管理,键值对配置管理等来入手,同样也是从官网提供的mockito用例入手。

KVConfig包

namesrv的功能其中之一就是提供broker的注册以及路由的管理,而broker的注册信息就从这个包里体现出关联关系,譬如下面从kvconfig包来分析定义的KVConfig的包装类信息,从源码来看是提供了一个包装类,这个类同样是实现了定义的序列化接口RemotingSerializable从RocketMQ源码的NameSrv模块分析RocketMQ

从mockito的用例分析,wraper类针对namesrv的注册功能,提供了键值对的方式,约定是broker-xxxx,topic-xxxx等方式提供wraper之后的序列化处理

public class KVConfigSerializeWrapperTest {
    private KVConfigSerializeWrapper kvConfigSerializeWrapper;

    @Before
    public void setup() throws Exception {
        kvConfigSerializeWrapper = new KVConfigSerializeWrapper();
    }

    @Test
    public void testEncodeAndDecode() {
        HashMap<String, HashMap<String, String>> result = new HashMap<>();
        HashMap<String, String> kvs = new HashMap<>();
        kvs.put("broker-name", "default-broker");
        kvs.put("topic-name", "default-topic");
        kvs.put("cid", "default-consumer-name");
        result.put(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, kvs);
        kvConfigSerializeWrapper.setConfigTable(result);
        byte[] serializeByte = KVConfigSerializeWrapper.encode(kvConfigSerializeWrapper);
        assertThat(serializeByte).isNotNull();

        KVConfigSerializeWrapper deserializeObject = KVConfigSerializeWrapper.decode(serializeByte, KVConfigSerializeWrapper.class);
        assertThat(deserializeObject.getConfigTable()).containsKey(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
        assertThat(deserializeObject.getConfigTable().get(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG).get("broker-name")).isEqualTo("default-broker");
        assertThat(deserializeObject.getConfigTable().get(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG).get("topic-name")).isEqualTo("default-topic");
        assertThat(deserializeObject.getConfigTable().get(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG).get("cid")).isEqualTo("default-consumer-name");
    }

}

而通过对KVConfigWraper类的源码分析,下面就是针对这个包装类对KVConfigManager类进行应用。通过架构图,该类是单独的应用类,没有任何继承实现关系,而提供了针对KVConfig的信息的获取,提供了CRUD的操作方式,值得注意的是这里的所有的操作都使用了AQS性质的reentranlock来保证线程安全。下面就从命名空间获取的KV信息方法来看看针对数据安全方面是如何保证的,

public byte[] getKVListByNamespace(final String namespace) {
        try {
            this.lock.readLock().lockInterruptibly();
            try {
                HashMap<String, String> kvTable = this.configTable.get(namespace);
                if (null != kvTable) {
                    KVTable table = new KVTable();
                    table.setTable(kvTable);
                    return table.encode();
                }
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("getKVListByNamespace InterruptedException", e);
        }

        return null;
    }

routingInfo包

提供了KV方式把broker注册到namesrv之外,还提供了针对client的请求通过topic得到routing的信息,而通过架构图这里的BrokerHouseKeepingService实现了ChannelEvenListener,而ChannelEvenListener就是针对Netty的Channel做了扩展,实现通信。从RocketMQ源码的NameSrv模块分析RocketMQ
通过houseKeeping的源码看出来核心的逻辑都在NamesrvController的实现

public class BrokerHousekeepingService implements ChannelEventListener {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final NamesrvController namesrvController;

    public BrokerHousekeepingService(NamesrvController namesrvController) {
        this.namesrvController = namesrvController;
    }

    @Override
    public void onChannelConnect(String remoteAddr, Channel channel) {
    }

    @Override
    public void onChannelClose(String remoteAddr, Channel channel) {
        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    }

    @Override
    public void onChannelException(String remoteAddr, Channel channel) {
        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    }

    @Override
    public void onChannelIdle(String remoteAddr, Channel channel) {
        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    }
}

而重点分析namesrvController,这个类也是没有任何的继承实现,使用了Excutor扩展的类,实现定时任务,对broker的心跳的检查

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

以及下面将要介绍的processor包的方法封装,对请求注册的类型进行分类注册到namesrv上。

private void registerProcessor() {
        if (namesrvConfig.isClusterTest()) {

            this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
                this.remotingExecutor);
        } else {

            this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
        }
    }

这里多提一个点就是,实现ssl安全加密也是使用了netty的sslHelper实现,具体的netty源码暂时不在本节提供,后续netty源码分析的时候再综合RocketMQ的源码引用再作为例子来详细说

public void loadSslContext() {
        TlsMode tlsMode = TlsSystemConfig.tlsMode;
        log.info("Server is running in TLS {} mode", tlsMode.getName());

        if (tlsMode != TlsMode.DISABLED) {
            try {
                sslContext = TlsHelper.buildSslContext(false);
                log.info("SSLContext created for server");
            } catch (CertificateException e) {
                log.error("Failed to create SSLContext for server", e);
            } catch (IOException e) {
                log.error("Failed to create SSLContext for server", e);
            }
        }
    }

processor包

namesrv提供了KV的配置管理包之外,还提供了针对请求的不同类型进行处理,分别从针对集群模式的,以及默认模式的方式,值得注意的是这里的requestProcessor处理都是长连接方式处理,通过Netty的通信实现,下面针对请求的方式进行分析
而DefaultRequestProcessor的核心方法是processRequest(ChannelHandlerContext ctx, RemotingCommand request),这个方法封装了前面章节提到的common模块的RemotingHelper类的封装。

public static String parseChannelRemoteAddr(final Channel channel) {
        if (null == channel) {
            return "";
        }
        SocketAddress remote = channel.remoteAddress();
        final String addr = remote != null ? remote.toString() : "";

        if (addr.length() > 0) {
            int index = addr.lastIndexOf("/");
            if (index >= 0) {
                return addr.substring(index + 1);
            }

            return addr;
        }

        return "";
    }

而集群模式的请求核心的代码逻辑如下TopicRouteData 的获取过程。

@Override
    public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final GetRouteInfoRequestHeader requestHeader =
            (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
        //这里调用controller的方法获取topic的具体数据
        TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
        if (topicRouteData != null) {
            String orderTopicConf =
                this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                    requestHeader.getTopic());
            topicRouteData.setOrderTopicConf(orderTopicConf);
        } else {
            try {
                topicRouteData = adminExt.examineTopicRouteInfo(requestHeader.getTopic());
            } catch (Exception e) {
                log.info("get route info by topic from product environment failed. envName={},", productEnvName);
            }
        }

        if (topicRouteData != null) {
            byte[] content = topicRouteData.encode();
            response.setBody(content);
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }

        response.setCode(ResponseCode.TOPIC_NOT_EXIST);
        response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
            + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
        return response;
    }

总结

整个namesrv的模块还涉及数据安全方面的问题,使用了线程的控制countDownLatch来控制,这里countdown的源码也是非常有参考价值分别使用了JDK的unsafe类以及AQS的知识,后续有时间再补充这块的内容。

相关文章: