chenyanbin

导读

  之前学过ActiveMQ但是并发量不是很大点我直达,所以又学阿里开源的RocketMQ,据说队列可以堆积亿级别。下面是网上找的消息队列对比图,仅供参考

部署

官网

点我直达

前置条件

  1. 推荐使用64位操作系统,建议使用Linux / Unix / Mac;
  2. 64位JDK 1.8+;
  3. Maven 3.2.x;
  4. Git;
  5. 适用于Broker服务器的内存4G +可用磁盘

下载

地址:https://downloads.apache.org/rocketmq/4.7.1/rocketmq-all-4.7.1-source-release.zip 

百度云盘:

链接: https://pan.baidu.com/s/1luq_MwxSn8k_bugrnQSJWg  密码: varj

安装依赖项

  1. jdk:点我直达
  2. maven:点我直达
  3. git安装:yum install -y git
export JAVA_HOME=/opt/soft/jdk1.8.0_202
export PATH=$JAVA_HOME/bin:$PATH
export CLASPATH=.:$JAVA_home/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME PATH CLASSPATH
export MAVEN_HOME=/opt/soft/apache-maven-3.6.3
export PATH=$PATH:$MAVEN_HOME/bin

mq上传至linux

解压

 

maven编译

 

启动NameServer

后台启动方式

nohup sh bin/mqnamesrv &

NameServer启动时内存不足(问题解决)

找到runserver.sh 修改JAVA_OPT

vim /bin/runserver.sh配置

启动Broker

nohup sh bin/mqbroker -n localhost:9876 &

语法:nohup sh bin/mqbroker -n NameServer服务ip地址

 

Broker内存不足(问题解决)

找到runbroker.sh 修改JAVA_OPT

vim /bin/runbroker.sh配置

服务都启动成功

 

模拟消费

export NAMESRV_ADDR=localhost:9876

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

开2个控制台,连接通一台linux

注意

  NameServer默认端口号:9876;broker默认端口号:10911

可视化控制台

官网地址

点我直达

百度云盘

链接: https://pan.baidu.com/s/1mdEGkq-JBTy1wtNmFPkmDg  密码: v6bq

解压

安装编译

进入:/opt/soft/rocketmq-externals-master/rocketmq-console
编译: mvn clean package -Dmaven.test.skip=true

修改appliccation.properties的rocketmq.config.namesrvAddr

编译打包

启动

  进入target目录,启动java -jar

守护进程启动: nohup java -jar rocketmq-console-ng-2.0.0.jar &

SpringBoot整合RocketMQ(生产者)

创建SpringBoot项目

点我直达

项目结构

加入依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.ybchen</groupId>
    <artifactId>ybchen-mq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>ybchen-mq</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--注意: 这里的版本,要和部署在服务器上的版本号一致-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.7.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

PayProducer.java

package com.ybchen.ybchenmq.jms;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;

/**
 * 消息生产者
 */
@Component
public class PayProducer {
    /**
     * 生产者所属的组
     */
    private String producerGroup = "pay_group";
    /**
     * MQ的地址,注意需开放端口号或者关闭防火墙
     */
    private String nameServerAddr = "192.168.199.100:9876";
    private DefaultMQProducer producer;

    public PayProducer() {
        producer = new DefaultMQProducer(producerGroup);
        //指定NameServer地址,多个地址以;隔开
        //如 producer.setNamesrvAddr("192.168.199.100:9876;192.168.199.101:9876;192.168.199.102:9876")
        producer.setNamesrvAddr(nameServerAddr);
        start();
    }

    /**
     * 获取生产者
     * @return
     */
    public DefaultMQProducer getProducer() {
        return this.producer;
    }

    /**
     * 开启,对象在使用之前必须要调用一次,只能初始化一次
     */
    public void start() {
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    /**
     * 关闭,一般在应用上下文,使用上下文监听器,进行关闭
     */
    public void shutdown() {
        this.producer.shutdown();
    }
}

PayController.java

package com.ybchen.ybchenmq.controller;

import com.ybchen.ybchenmq.jms.PayProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName:PayController
 * @Description:支付
 * @Author:chenyb
 * @Date:2020/10/18 2:47 下午
 * @Versiion:1.0
 */
@RestController
@RequestMapping("/api/v1")
public class PayController {
    @Autowired
    private PayProducer payProducer;

    private static final String TOPIC = "ybchen_pay_topic";

    /**
     * 支付回调
     *
     * @param text
     * @return
     */
    @RequestMapping("pay_cb")
    public Object callback(String text) {
        /**
         * String topic:话题
         * String tags:二级分类
         * byte[] body:body消息字节数组
         */
        Message message = new Message(TOPIC,"tag_a",("hello ybchen ==>"+text).getBytes());
        try {
            SendResult send = payProducer.getProducer().send(message);
            System.out.println("send------>"+send);
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "ok";
    }
}

测试

常见错误

错误一

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException:
sendDefaultImpl call timeout
原因:阿里云存在多网卡,rocketmq会根据当前网卡选择一个IP使用,当你的机器有多块网卡时,很可能会有问题,比如,机器上有两个ip,一个公网ip,一个私网ip,因此需要配置broker.conf指定当前公网的ip,然后重启broker


修改配置:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/conf/broker.conf
新增这个配置:brokerIP1=xxx.xxx.xxx.xxx

启动命令:nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &

错误2

MQClientException: No route info of this topic, TopicTest1

原因:Broker 紧追自动创建Topic,且用户没有通过手工方式创建此Topic,或者broker和Nameserver网络不通

解决:
    通过sh bin/mqbroker -m 查看配置
    autoCreateTopicEnable=true 则自动创建Topic

Centos 7 关闭防火墙:systemctl stop firewalld

错误3

控制台查看不了数据,提示连接10909错误

原因:Rocket默认开启了VIP通道,VPI通道端口号为10911-2=10909

解决:阿里云安全组添加一个端口:10909

错误4

  无法自动创建topic:客户端版本要和服务端版本保持一致

服务器上装的是4.7.1

引入依赖项时
        <!--注意: 这里的版本,要和部署在服务器上的版本号一致-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.7.1</version>
        </dependency>

检索消息发送

SpringBoot整合RocketMQ(消费者)

创建SpringBoot项目

 

项目结构

加入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.ybchen</groupId>
    <artifactId>ybchen-mq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>ybchen-mq</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--注意: 这里的版本,要和部署在服务器上的版本号一致-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.7.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

PayConsumer.java

package com.ybchen.ybchenmqconsumer.jms;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * @ClassName:PayConsumer
 * @Description:消费者
 * @Author:chenyb
 * @Date:2020/10/18 4:13 下午
 * @Versiion:1.0
 */
@Component
public class PayConsumer {
    /**
     * 生产者所属的组
     */
    private String producerGroup = "pay_consumer_group";
    /**
     * MQ的地址,注意需开放端口号或者关闭防火墙
     */
    private String nameServerAddr = "192.168.199.100:9876";
    /**
     * 订阅主题
     */
    private String topic = "ybchen_pay_topic";
    private DefaultMQPushConsumer consumer;

    public PayConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(producerGroup);
        //指定NameServer地址,多个地址以;隔开
        //如 producer.setNamesrvAddr("192.168.199.100:9876;192.168.199.101:9876;192.168.199.102:9876")
        consumer.setNamesrvAddr(nameServerAddr);
        //设置消费地点,从最后一个开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //订阅主题,监听主题下的那些标签
        consumer.subscribe(topic, "*");
        //注解一个监听器
        //lambda方式
//        consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
//            try {
//                Message message = msg.get(0);
//                System.out.printf("%s Receive New Messages: %s %n",
//                        Thread.currentThread().getName(), new String(msg.get(0).getBody()));
//                //主题
//                String topic = message.getTopic();
//                //消息内容
//                String body = null;
//                body = new String(message.getBody(), "utf-8");
//                //二级分类
//                String tags = message.getTags();
//                ////                String keys = message.getKeys();
//                System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
//                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//            } catch (UnsupportedEncodingException e) {
//                e.printStackTrace();
//                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
//            }
//        });

        //一般方式
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                try {
                    Message message = list.get(0);
                    System.out.printf("%s Receive New Messages: %s %n",
                            Thread.currentThread().getName(), new String(list.get(0).getBody(),"utf-8"));
                    //主题
                    String topic = message.getTopic();
                    //消息内容
                    String body = null;
                    body = new String(message.getBody(), "utf-8");
                    //二级分类
                    String tags = message.getTags();
                    //
                    String keys = message.getKeys();
                    System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });
        consumer.start();
        System.out.println("consumer start ..........");
    }
}

application.properties

server.port=8081

测试生产者消费者

MQ集群架构模式分析

单节点

优点

  本地开发测试,配置简单,同步刷盘消息一条都不会丢

缺点

  不可靠,如果宕机,会导致服务不可用

主从(异步、同步双写)

优点

  同步双写消息不丢失,异步复制存在少量丢失你,主节点宕机,从节点可以对外提供消息的消费,但是不支持写入

缺点

  主备有短暂消息延迟,毫秒级,目前不支持自动切换,需要脚本或者其他程序进行检测然后停止broker,重启让从节点成为主节点

双主

优点

  配置简单,可以靠配置RAID磁盘阵列保证消息可靠,异步刷盘丢失少量消息

缺点

  master宕机期间,未被消费的消息在机器恢复之前不可消息,实时性会受到影响

双主双从,多主多从模式(异步复制)

优点

  磁盘损坏,消息丢失的非常小,消息实时性不会受影响,Master宕机后,消费者仍然可以从Slave消费

缺点

  主备有短暂消息延迟,毫秒级,如果Master宕机,磁盘损坏情况,会丢失你少量消息

双主双从,多主多从模式(同步双写)

优点

  同步双写方式,主备都写成功,才向应用返回成功,服务可用性与数据可用性非常高

缺点

  性能比异步复制模式略低,主宕机后,备机不能自动切换为主机

推荐

  1. 主从(异步、同步双写)
  2. 双主双从,多主多从模式(异步复制)
  3. 双主双从,多主多从模式(同步双写)

主从集群搭建

准备工作

  准备2台机器,ip地址分别为:192.168.199.100;192.168.199.101;

  环境:RocketMQ4.7.1+jdk8+Maven+Centos 7

启动两台nameserver

  启动两个机器的nameserver

路径:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1

启动:nohup sh bin/mqnamesrc &

编辑并启动roccketmq

主节点

进入:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/conf/2m-2s-async


编辑并修改如下:vim broker-a.properties 
namesrvAddr=192.168.199.100:9876;192.168.199.101:9876
brokerClusterName=YbChenCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH


启动:nohup sh bin/mqbroker -c conf/2m-2s-async/broker/broker-a.properties &

从节点

进入:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/conf/2m-2s-async


编辑并修改如下:vim broker-a-s.properties 
namesrvAddr=192.168.199.100:9876;192.168.199.101:9876
brokerClusterName=YbChenCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH


启动:nohup sh bin/mqbroker -c conf/2m-2s-async/broker/broker-a-s.properties &

注意事项

  1. namesrvAddr:相同
  2. brokerClusterName:相同
  3. brokerName:相同
  4. brokerId:不同,0是主节点
  5. deleteWhen:相同
  6. fileReservedTime:相同
  7. brokerRole:不同,分ASYNC_MASTER、SLAVE
  8. flushDiskType:相同

启动broker

使用管控台

  使用192.168.199.100这台服务器,修改配置

192.168.199.100这台服务器

进入:/opt/soft/rocketmq-externals-master/rocketmq-console/src/main/resources


修改配置文件:vim application.properties

rocketmq.config.namesrvAddr=192.168.199.100:9876;192.168.199.101:9876


编译

切换到:/opt/soft/rocketmq-externals-master/rocketmq-console
打包:
mvn clean
mvn install -Dmaven.test.skip=true


启动

进入:/opt/soft/rocketmq-externals-master/rocketmq-console/target
守护进程方式启动:nohup java -jar rocketmq-console-ng-2.0.0.jar &

集群测试

故障演练

  模拟主挂了,但是从还可以被消费,此时不能写入,等主重启后,可以继续写入(数据不会被重复消费),以下内容是连续的

总结

  好了,到目前为止,主从已经搭建完成了。

  Broker分Master和Slave一个Master可以对应多个Slave,但一个Slave只能对应一个Master,Master与Slave通过相同的Broker Name来匹配,不同的Broker id来定义时Master还是Slave

    Broker向所有的NameServer节点建立长连接,定时注册Topic和发送元数据信息

    NameServer定时扫描(默认2分钟)所有存活Broker的连接,如果超过时间没响应,则断开连接(心跳检测),但是Consumer客户端不能感知,Consumer定时(30秒)从NameServer获取topic的最新信息,所以broker不可用时,Consumer最多需要30秒才能发现

  只有Master才能进行写入操作Slave不允许写入只能同步,同步策略取决于Master配置

  客户端消费可以从Master和Slave消费,默认消费者都从Master消费,如果在Master挂了之后,客户端从NameServer中感知Broker宕机,就会从Slave消费,感知非实时,存在一定的滞后性,Slave不能保证Master的100%都同步过来,会有少量的消息丢失。一旦Master恢复,未同步过去的消息会被最终消费掉。

  如果Consumer实例的数量比Message Queue的总数量还多的话,多出来的Consumer实例将无法分到Queue,也就无法消费到消息,也就无法起到分摊负载的作用,所以需要控制让Queue的总数量大于Consumer的数量。

场景模拟

生产和消费重试及处理

生产者重试

  • 消息重试(保证数据的高可靠性),本身内部支持重试,默认次数是2
  • 如果网络情况较差,或者跨集群则建议多改几次

生产者设置重试次数,并设置唯一的key(一般唯一标识符)

消费者重试

  • 原因:消息处理异常,broker端到consumer端各种问题,如网络原因闪断,消费处理失败,ACK返回失败等
  • 注意
    • 重试间隔时间配置,默认每条消息最多重试16次
    • 超过重试次数人工补偿
    • 消费端去重
    • 一条消息无论重试多少次,这些重试消息的Message ID,key不会改变
    • 消费重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息

设置广播方式

模拟消息重发

异步发送消息和回调实战

应用场景

  比如12306付完钱

相关文章: