通常来说我们对分布式队列有高可靠性的要求,所以数据要进行持久化存储。
- 消息生产者发送消息到MQ。
- MQ收到消息,将消息进行持久化,即在存储系统中新增一条记录。
- 返回ACK确认消息给生产者。
- 然后MQ推送消息给对应的消费者,等待消费者返回ACK。
- 如果消息消费者在指定时间内成功返回ACK,那么MQ认为消息消费成功,在存储系统中删除消息,即执行第6步;如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新推送消息,重复执行4、5、6步骤。
上图中的存储系统
- 关系型数据库DB
存储系统可选用JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障。
- 文件系统
目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。
性能对比:文件系统>关系型数据库DB
消息存储---刷盘机制
RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。
同步刷盘
在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。
异步刷盘
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入
不管是同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。