由于工作的原因,需要在RocketMQ源码的基础上进行修改,实现自己的功能,因此打算读读源码,写写日记,后续会使用aeron替换掉netty,比较两者的性能指标。
RocketMQ主要包括NameSrv、Broker、Producer和Consumer四个模块,模块之间通过Netty进行底层网络传输,因此,RocketMQ的线程模型也是根据Netty的特点进行构建。
RocketMQ对Netty的封装
RocketMQ在原生Netty的基础上,封装了自己的Server和Client,分别为NettyRemotingServer和NettyRemotingClient,该部分的类关系如下图所示。其中NettyRemotingAbstract定义了消息在通过Netty接收并编解码为消息后的一系列业务操作;NettyRemotingServer和NettyRemotingClient是分别对Netty Server和Netty Client的接口封装。
熟悉Netty的同学都知道,Netty的客户端或者服务端在收到消息时,消息的数据流会流经在PipeLine上定义的所有ChannelHandler,以NettyRemotingServer为例,在引导创建Netty Server时,共创建了6个ChannelHandler,该部分代码如图:
该部分代码实现了对消息流的连接检测、编解码和具体的消息业务处理操作,这里我们需要关注的是NettyServerHandler,消息在经过前面几个ChannelHandler的处理之后,NettyServerHandler接受输入的已经被解码的消息,触发它的channelRead0函数,调用processMessageReceived函数对消息进行业务处理。
RocketMQ的线程组织
processMessageReceived函数会根据收到的消息类型不同,调用不同的processor对该消息进行处理,该部分代码如图所示:
对于每一种消息类型,RocketMQ都定义了对应的处理代码(processor),并且定义了对应的线程池用于执行处理代码,而这些消息类型到processor和线程池的映射关系,在模块的初始化时都已经创建完毕,并保存在NettyRemotingAbstract的processorTable当中,如下图是broker在初始化时调用registerProcessor函数注册自己能处理的所有消息类型的processor和线程池。
综上,RocketMQ处理消息的接收、处理和结果返回的大致流程如下图:
因此,RocketMQ各个模块的线程模型也较为简单,定义若干个ExecuteService用于执行各类消息的业务处理代码,而具体的业务处理代码都写在对应的processor。具体每个模块的processor定义,将在后续的源码解析中给出。