消息存储,高可用机制,负载均衡,消息重试,死信队列,消息幂等

消息存储:为了保障高可用需要持久化:

RocketMQ学习3-原理

存储介质:

1)关系型数据库DB:Apache下开源的另外一款MQ—ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。

2)文件系统:(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化

存储过程读和写是如何保持高速的:

写:RocketMQ的消息用顺序写,保证了消息存储的速度:目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。 磁盘随机写的速度只有大概100KB/s ,和顺序写的性能相差6000倍!

读:Linux操作系统分为【用户态】和【内核态】,文件操作、网络操作需要涉及这两种形态的切换 。一台服务器 把本机磁盘文件的内容发送到客户端,一般分为两个步骤:

1)read;读取本地文件内容;

2)write;将读取的内容通过网络发送出去。

这两个看似简单的操作,实际进行了4 次数据复制,分别是:

  1. 从磁盘复制数据到内核态内存;

  2. 从内核态内存复 制到用户态内存;

  3. 然后从用户态 内存复制到网络驱动的内核态内存;

  4. 最后是从网络驱动的内核态内存复 制到网卡中进行传输。RocketMQ学习3-原理

这种mmap也就是零拷贝省去向用户态的内存复制,提高速度。 RocketMQ充分利用了上述特性,提高消息存盘和网络发送的速度。 这种机制在Java中是通过MappedByteBuffer实现的,但采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因

消息存储结构:

RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成 的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每 个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。还有另外一个hash索引IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程,当消息达到CommitLog后,会通过ReputMessageService线程接近实时地将消息转发给消息消费队列文件与索引文件。为了安全起见,RocketMQ引入abort文件,记录Broker的停机是否是正常关闭还是异常关闭(正常关闭调用钩子方法删除abort文件,异常关闭这个文件则会存在),在重启Broker时为了保证CommitLog文件,消息消费队列文件与Hash索引文件的正确性,分别采用不同策略来恢复文件。

RocketMQ学习3-原理

刷盘机制:RocketMQ的消息是存储到磁盘上的 ,有两种写磁盘方式,分布式同步刷盘和异步刷盘:

1)同步刷盘
在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。
2)异步刷盘
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE(同CommitLog 同等大小堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存(ByteBuffer ),然后CommitRealTimeService线程每隔200ms将ByteBuffer 提交到物理文件的内存映射(MappedByteBuffer ,MappedByteBuffer在内存中追加提交的内容,wrotePosition指针向后移动 )中,commit操作成功返回,将committedPosition位置恢复 ,然后FlushRealTimeService线程默认每500ms将MappedByteBuffer中新追加的内存刷写到磁盘。如果未开启transientStorePoolEnable,消息直接追加到物理文件直接映射文件中,然后刷写到磁盘中),然后启动专门的刷盘线程定时将内存中的文件数据刷写到磁盘。 写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
3)配置
同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。

RocketMQ分布式集群是通过Master和Slave的配合达到高可用性:
Master和Slave的区别:在Broker的配置文件中,参数 brokerId的值为0表明这个Broker是Master,大于0表明这个Broker是 Slave,同时brokerRole参数也会说明这个Broker是Master还是Slave。Master角色的Broker支持读和写,Slave角色的Broker仅支持读,也就是 Producer只能和Master角色的Broker连接写入消息。读高可用:当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave 读。写高可用:在创建Topic的时候,把Topic的多个MessageQueue创建在多个Broker组上(相同Broker名称,不同brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer仍然可以发送消息。(开源版)RocketMQ目前还不支持把Slave自动转成Master,如果机器资源不足,需要把Slave转成Master,则要手动停止Slave角色的Broker,更改配置文件,用新的配置文件启动Broker。向某一个broker发送消息失败会设置规避机制。

主从复制:消息需要从Master复制到Slave 上,有同步和异步两种复制方式。 同步复制方式是等Master和Slave均写 成功后才反馈给客户端写成功状态,Slave 数据完整,吞吐量低;异步复制方式是只要Master写成功 即可反馈给客户端写成功状态,Slave 数据可能丢失。通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE

负载均衡:

生产者:Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下。消息生产者(Producer)在发送消息时之前先从NameServer获取Broker服务器地址列表,然后根据负载均衡算法从列表中选择一台服务器进行发送。

消费者:集群模式下:在拉取的时候需要明确指定拉取哪一条message queue。而每当实例的数量有变更,都会触发一次所有实例的负载均衡(rebalance:RebalanceService线程来实现。一个MQClientInstance持有一个RebalanceService实现,并随着MQClientInstance的启动而启动,默认每隔20s进行一次消息队列负载 ),这时候会按照queue的数量和实例的数量平均分配queue给每个实例。一个queue只分给一个consumer实例,一个consumer实例可以允许同时分到不同的queue。 但是如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让queue的总数量大于等于consumer的数量。广播模式下:要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。

消息传递方式:推模式、拉模式。所谓的拉模式,是消费端主动拉起拉消息请求,而推模式是消息达到消息服务器后,推送给消息消费者。RocketMQ消息推模式的实现基于拉模式,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务,循环向消息服务端发起消息拉取请求 。RocketMQ使用一个单独的线程PullMessageService来负责消息的拉取 ,PullMessageService根据RebalanceService线程创建的拉取任务 从消息服务器默认每次拉取32条消息,按照消息的队列偏移量顺序存放在ProcessQueue中,PullMessageService然后将消息提交到消费者消费线程池(ConsumeMessageService 使用线程池来消费消息,确保了消息拉取与消息消费的解耦。ConsumeMessageService支持顺序消息和并发消息,区别是消息消费时必须成功锁定消息消费队列,在Broker端会存储消息消费队列的锁占用情况 ),消息成功消费后从ProcessQueue中移除。(ProcessQueue是MessageQueue在消费端的重现、快照 )

拉取消息可设置是否开启长轮询,如果不启用长轮询机制,则会在服务端等待shortPollingTimeMills时间后(挂起)再去判断消息是否已经到达指定消息队列,如果消息仍未到达则提示拉取消息客户端PULL—NOT—FOUND(消息不存在)。开启长轮询:每隔5s(或有消息到达时)轮询检查一次消息是否可达,同时一有消息达到后立马通知挂起线程

消息重试:

顺序消息失败后会自动不断进行消息重试,每次间隔1秒。对于无须消息可以通过设置返回状态达到消息重试的结果。 无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性。RocketMQ 默认允许每条消息最多重试 16 次(可自定义),间隔从10s到2小时。总和最多到 4 小时 46 分钟,超过这个时间范围消息将不再重试投递。

在集群模式小消费者监听设置返回状态有三种选择:
//方式1:返回 Action.ReconsumeLater,消息将重试(推荐)
return Action.ReconsumeLater;
//方式2:返回 null,消息将重试
return null;
//方式3:直接抛出异常, 消息将重试
throw new RuntimeException("XXX");

return Action.CommitMessage;则不会重试、message.getReconsumeTimes()返回重试次数。

死信队列:

正常情况下无法被消费的消息(超过重试次数)称为死信消息,有效期与正常消息相同,均为 3 天,3 天后会被自动删除(磁盘空间不足或者默认凌晨4点删除过期文件,文件保存72小时并且在删除文件时并不会判断该消息文件上的消息是否被消费。)。 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。 排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。

消息幂等:

如果网络抖动,生产者推MQ没有收到确认返回会再推一遍,MQ没有收到消费者确认返回会再投递,负载均衡时Broker 重启以及订阅方应用重启 Rebalance也会导致消息重复。

Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。 最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 进行设置:

Message message = new Message();
message.setKey("ORDERID_100");

RocketMQ支持局部顺序消息消费,也就是保证同一个消息队列上的消息顺序消费。不支持消息全局顺序消费,如果要实现某一个主题的全局顺序消费,可以将该主题的队列数设置为1,牺牲高可用性。

 

定时消息是用scheduleAtFixedRate实现的,不支持任意精度的定时调度消息,只支持自定义的消息延迟级别,例如1s、2s、5s等,可通过在broker配置文件中设置messageDelayLevel。

 

原理:

nameServer启动步骤:1)解析配置文件,填充NameServerConfig、NettyServerConfig属性值,并创建NamesrvController 2)根据启动属性创建NamesrvController实例,并初始化该实例。NameServerController实例为NameServer核心控制器 3)在JVM进程关闭之前,先将线程池关闭,及时释放资源

一个Topic拥有多个消息队列,一个Broker为每一个主题创建4个读队列和4个写队列。多个Broker组成一个集群,集群由相同的多台Broker组成Master-Slave架构,brokerId为0代表Master,大于0为Slave。BrokerLiveInfo中的lastUpdateTimestamp存储上次收到Broker心跳包的时间。

路由注册和维护如下图

RocketMQ学习3-原理

路由发现是非实时的,当Topic路由出现变化后,NameServer不会主动推送给客户端,而是由客户端定时拉取主题最新的路由。 Broker失效,移除该Broker,关闭与Broker连接,同时更新topicQueueTablebrokerAddrTablebrokerLiveTablefilterServerTable

RocketMQ有两个触发点来删除路由信息:
NameServer定期扫描brokerLiveTable检测上次心跳包与当前系统的时间差,如果时间超过120s,则需要移除broker。
Broker在正常关闭的情况下,会执行unregisterBroker指令

消息消费者:

 

 

 

 

 

 

 

相关文章:

  • 2021-10-18
  • 2021-12-19
  • 2021-08-01
  • 2021-08-13
  • 2021-06-06
  • 2022-12-23
猜你喜欢
  • 2021-04-09
  • 2021-07-03
  • 2022-12-23
  • 2021-04-13
  • 2021-11-23
  • 2021-10-21
  • 2021-06-24
相关资源
相似解决方案