概述
本文是个人学习笔记,参考《RocketMQ技术内幕》及RocketMQ源码4.2.0(https://github.com/apache/rocketmq/tree/release-4.2.0)
结论
生产环境如果开启了自动创建Topic,会有3个影响
- 用户指定的读写队列数可能不是预期结果。创建的Topic的读队列数和写队列数取值为默认Topic(“TBW102”)的读队列数和Produce端设置的队列数的最小值。
- 不能保证所有的Broker上都能注册该Topic。Broker创建了新的Topic后,新的Topic信息马上被同步给了Namesrv,然后其他的Producer也刚好都从Namesrv同步了该Topic的路由信息,则接下来所有的Producer都只会向这一个Broker发送消息,其他Broker也就不会再有机会创建新Topic。
- 启用Broker故障延迟机制时,由于不是所有的Broker上都创建了同一Topic,可能会导致故障延迟命中失败。
读写队列
Producer端默认的读写队列数都是4,Broker端默认主题“TBW102”的读写队列数都是16。新建的Topic的读写队列取两者最小值,在Broker端创建Topic的代码TopicConfigManager#createTopicInSendMessageMethod如下:
第168行是获取新Topic读写队列数的逻辑,其中clientDefaultTopicQueueNums是Producer端用户配置的队列数。
Broker创建Topic
Broker默认是可以在消息发送过程中自动创建Topic的,即autoCreateTopicEnable=true。
Producer获取Topic路由的途径有两个,1)通过定时任务每间隔30s从Namesrv同步;2)在发送消息过程中主动请求Namesrv。此处我们假设发送消息的时候Topic不存在,我们以非事务消息、非批量发送、非顺序消息、同步发送的方式进行源码解读。Producer消息发送最终会走到DefaultMQProducerImpl#sendDefaultImpl,代码如下:
447行是在消息发送前查找路由,首先查的是本地缓存,其次查询的是远程Namesrv。tryToFindTopicPublishInfo内部会先用真实Topic去查路由,发现没有后再用默认Topic(“TBW102”)去查路由信息,那为什么默认主题能查到路由呢?因为在Broker启动的时候,当autoCreateTopicEnable=true,会创建该主题,具体代码在Broker端的TopicConfigManager构造函数中,如下:
接下来Producer会使用默认主题路由信息中的Broker地址进行消息发送。所有的消息请求头中都会存储默认主题及客户端配置的读写队列数。完成了一次发送后,接下来去看一下Broker端是如何处理这个消息的。
在Broker端,一个消息处理的流程在SendMessageProcessor中,依次执行的方法是processRequest()->sendMessage()->msgCheck(),SendMessageProcessor#msgCheck代码如下:
179行是用真正的Topic查路由,查不到,然后走到192行去创建Topic,TopicConfigManager#createTopicInSendMethod代码片段在上一章“读写队列”中已经罗列,这里要说明的是新的主题创建后马上会被持久化并注册到Namesrv,逻辑同样在createTopicInSendMethod方法内,代码如下(202行、213行):
在213行,一旦Broker成功注册到Namesrv,其他Producer就可能同步到该路由信息,接下来他们就会只给这同一个Broker发消息。
故障延迟
什么是故障延迟
Producer如果往某个Broker发送消息失败了,就认为该Broker发生了故障,接下来(的30s)该客户端上的所有消息就都不再往该Broker发送了。
故障如何清除
- Broker在120s内不能成功发送心跳到Namesrv,会被Namesrv清除。然后Producer每间隔30s会从Namesrv更新一次Topic路由信息,自然不包含故障Broker。
- Broker故障恢复,30s后被继续使用。
如何配置
设置DefaultMQProducer.sendLatencyFaultEnable=true。
细节展开
rmq的消息发送会调用DefaultMQProducerImpl#sendDefaultImpl,其代码如下:
第465行和484行都是向容错策略里面写入Broker不可用时间(注意这里是Broker维度的,可以想象一下,只要有一个Topic向所有的Broker发送过消息,这里的容错策略里面存储的就是全量的Broker容错信息)。发送成功和发送失败调用的是同一个方法,区别在于isolation入参,它决定了容错时间是mq消息耗时还是直接固定的30s。这里有一个细节,容错时间实际上是固定的几个值,类似于延迟级别,详见MQFaultTolerance.notAvaliableDuration= {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}。当传入的延迟时间大于notAvaliableDuration的某个值时,实际上直接取了这个固定值。
在第457行是路由选择的逻辑,这里匹配到的MessageQueue会定位到具体的Broker及队列编号。MQFaultStrategy#selectOneMessageQueue代码如下:
同一个消息在第一次发送失败后,第二次进入该方法,lastBrokerName会被赋值为失败的Broker,然后在67行和68行规避故障Broker。
其他消息第一次发送时候,故障的Broker也会在67行被规避。
如果走到了73行,说明消息失败重试过程中故障的Broker还未恢复,这里选择出来的notBestBroker是所有往当前MQClientInstance实例发送过消息的Broker中的一个,具体步骤如下:
- 从Broker容错表中取出所有Broker信息(封装在FaultItem)放入列表。
- 打乱后重排序Broker信息列表,取前一半数据,然后根据上一次累计使用的数字取模来获得当前要用的FaultItem。
如果某个Topic不是在所有的Broker上都创建了,这里会出现取出来的Broker上没有该Topic信息,接下来在75行会发现可写队列数无效,然后代码走到83行,直接从容错策略中清除了该Broker,如果刚好这个Broker也故障了,就会导致了容错策略失效。