1.RocketMQ 是什么?

总结rocketmq的搭建

上图是一个典型的消息中间件收发消息的模型,RocketMQ也是这样的设计,简单说来,RocketMQ具有以下特点:

o 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。

o Producer、Consumer、队列都可以分布式。

o Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列,如果做集群消费,则多个Consumer实例平均消费这个topic对应的队列集合。

o 能够保证严格的消息顺序

o 提供丰富的消息拉取模式

o 高效的订阅者水平扩展能力

o 实时的消息订阅机制

o 亿级消息堆积能力

o 较少的依赖

2.RocketMQ 物理部署结构

总结rocketmq的搭建

如上图所示, RocketMQ的部署结构有以下特点:

o Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

o Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。

o Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

o Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

3.RocketMQ 逻辑部署结构

总结rocketmq的搭建

如上图所示,RocketMQ的逻辑部署结构有Producer和Consumer两个特点。

Producer Group

用来表示一个发送消息应用,一个Producer Group下包含多个Producer实例,可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个Producer对象。一个Producer Group可以发送多个Topic消息,Producer Group作用如下:

1. 标识一类Producer

2. 可以通过运维工具查询这个发送消息应用下有多个Producer实例

3. 发送分布式事务消息时,如果Producer中途意外宕机,Broker会主动回调Producer Group内的任意一台机器来确认事务状态。

Consumer Group

用来表示一个消费消息应用,一个Consumer Group下包含多个Consumer实例,可以是多台机器,也可以是多个进程,或者是一个进程的多个Consumer对象。一个Consumer Group下的多个Consumer以均摊方式消费消息,如果设置为广播方式,那么这个Consumer Group下的每个实例都消费全量数据。

4.RocketMQ 数据存储结构

总结rocketmq的搭建

如上图所示,RocketMQ采取了一种数据与索引分离的存储方法。有效降低文件资源、IO资源,内存资源的损耗。即便是阿里这种海量数据,高并发场景也能够有效降低端到端延迟,并具备较强的横向扩展能力。

RocketMQ安装前准备

注:官方文档上是从GitHub上跟新代码到本地编译的方式安装的,如果是直接下载安装包的方式安装,则不需要Maven和Git,本次安装采用下载安装包的方式安装,操作系统是CentOS7,JDK8,另外在JDK9上RocketMQ会出现启动nameserver异常,日志如下,vm创建失败,网上还没找到解决办法,等后续版本加强对JDK9的支持再用会比较好。

从Apache官网上下载RocketMQ的安装包,下载地址如下:

http://rocketmq.apache.org/dowloading/releases/

总结rocketmq的搭建

上传到服务器并解压:

unzip  rocketmq-all-4.2.0-incubating-bin-release.zip

cp  -r  rocketmq-all-4.2.0-incubating  /usr/local

rm  -rf  rocketmq-all-4.2.0-incubating

配置RocketMQ的环境变量,vi  /etc/profile 添加内容:

export  ROCKETMQ_HOME=/user/local/rocketmq-all-4.2.0-incubating

export  PATH=$ROCKETMQ_HOME/bin:$PATH

export  NAMESERV_ADDR=IP:9876

NAMESRV_ADDR可以不在此绑定,但必须在启动broker时加上“-n 服务器IP:端口”以绑定nameserver,9876是nameserver的默认端口。

输入命令source /etc/profile,使配置生效,RocketMQ的安装及环境变量配置就完成了。

配置文件在conf,下面有几个配置文件,分别是几种模式:

总结rocketmq的搭建

2m-noslave: 多Master模式

2m-2s-sync: 多Master多Slave模式,同步双写

2m-2s-async:多Master多Slave模式,异步复制

配置文件参数可以参考官网:

文档地址是http://rocketmq.apache.org/docs/rmq-deployment/

也可参考:

-------------------------------------------------分割线----------------------------------------------------------

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master, >0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=10.79.3.105:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/opt/rocketmqServer/alibaba-rocketmq/store
#commitLog 存储路径
storePathCommitLog=/opt/rocketmqServer/alibaba-rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/opt/rocketmqServer/alibaba-rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/opt/rocketmqServer/alibaba-rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/opt/rocketmqServer/alibaba-rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/opt/rocketmqServer/alibaba-rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false

#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

-------------------------------------------------分割线----------------------------------------------------------
如果nameserver要集群的话,要在namesrvAddr上列出所有nameserver节点的IP和端口;brokerRole要填写当前节点的角色有ASYNC_MASTER、SYNC_MASTER、SLAVE三种

启动nameserver:

在ROCKET_HOME路径下执行命令nohup sh mqnamesrv &

在启动状态执行查询命令tail -f ~/logs/rocketmqlogs/namesrv.log

启动broker:

在ROCKET_HOME路径下执行命令nohup sh mqbroker &

如果没有注册NAMESRV_ADDR执行nohup sh mqbroker -n服务器IP:9876 &

如有多个broker的配置文件,则还要加上启动哪个broker的配置文件,如:

nohup sh mqbroker -n 服务器IP:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &

修改启动内存:

可使用双机主从把master跟slave分别放在不同机器上互相主从

启动如果提示内存不足,可以修改启动脚本参数,vim runboker.sh 

总结rocketmq的搭建

把nameserver和broker的内存都改小,当然在实际开发环境部署时也可以根据实际需求将其改大

tail -f ~/logs/rocketmqlogs/broker.log查看broker的启动状态

关闭nameserver和broker:

关闭nameserver在ROCKETMQ_HOME路径下执行以下命令:

sh mqshutdown namesrv

关闭broker在ROCKETMQ_HOME路径下执行以下命令:

sh mqshutdown broker

先关闭所有的broker后,再关闭nameserver

测试发送:

RocketMQ有自带的调试工具模拟Producer和Consumer

在ROCKETMQ_HOME路径下执行以下命令模拟Producer发消息:

sh tools.sh org.apache.rocketmq.example.quickstart.Producer

测试接收:

在ROCKETMQ_HOME路径下执行以下命令模拟Consumer发消息:

sh tools.sh org.apache.rocketmq.example.quickstart.Consumer

创建producer:

DefaultMQProducer producer =new DefaultMQProducer("testmq");
producer.setNamesrvAddr("ip:9876");
producer.start();
//生产消息
String str = "Hello RocketMQ!------" +"45s25a2s5q2wwwq2waaaa";
Message msg = new Message("qch_20170706",str.getBytes());
SendResult result=producer.send(msg);
System.out.print(result.getMessageQueue());
System.out.print(result.getSendStatus());
System.out.print(result.getTransactionId());
System.out.print(result.getQueueOffset());
//停止Producer
producer.shutdown();

具体的查看返回对象消息:

总结rocketmq的搭建

成功!

相关文章: