建议先了解为什么项目要使用 MQ 消息队列,MQ 消息队列有什么优点,如果在业务逻辑上没有此种需求,建议不要使用中间件。中间件对系统的性能做优化的同时,同时增加了系统的复杂性也维护难易度;其次,需要了解各种常见的 MQ 消息队列有什么区别,以便在相同的成本下选择一种最合适本系统的技术。

本文主要讨论 RabbitMQ,从3月底接触一个项目使用了 RabbitMQ,就开始着手学习,主要通过视频和博客学习了一个月,基本明白了 RabbitMQ 的应用,其它的 MQ 队列还不清楚,其底层技术还有待学习,以下是我目前的学习心得。

RabbitMQ 是基于 Erlang 语言写的,所以首先安装 Erlang,本例是在 Windows 上安装,也可以选择在 Linux 上安装,机器上没有虚拟机,直接在 Windows 上操作,建议在 Linux 上安装。官方下载 Erlang 软件,我下载最新版本 21.3。安装过程很简单,直接 Next 到底。 Linux 安装自行谷歌。如下图:

java基础(六):RabbitMQ 入门
java基础(六):RabbitMQ 入门
java基础(六):RabbitMQ 入门安装结束后,设置环境变量,如下图
java基础(六):RabbitMQ 入门 
java基础(六):RabbitMQ 入门测试是否安装成功

2.安装 RabbitMQ

官方下载,选择最新版本 3.7。安装过程很简单,直接 Next 到底。如下图:

 

执行 rabbitmq-server start 命令,启动服务。本地登陆并创建用户,如下图:

java基础(六):RabbitMQ 入门java基础(六):RabbitMQ 入门关于tags标签的解释:

1、  超级管理员(administrator)

可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

2、  监控者(monitoring)

可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

3、  策略制定者(policymaker)

可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

4、  普通管理者(management)

仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

5、  其他

无法登陆管理控制台,通常就是普通的生产者和消费者。

4.JAVA 操作RabbitMQ

参考 RabbitMQ 官网,一共分为6个模式

RabbitMQ 是一个消息代理,实际上,它接收生产者产生的消息,然后将消息传递给消费者。在这个过程中,它可以路由、缓冲、持久化等,在传输过程中,主要又三部分组成。

生产者:发送消息的一端

java基础(六):RabbitMQ 入门
队列:它活动在 RabbitMQ 服务器中,消息存储的地方,队列本质上是一个缓冲对象,所以存储的消息不受限制
消费者:消息接收端
首先准备操作 MQ 的环境

(1): 准备必要的 Pom 文件,导入相应的 jar 包,

  1 <?xml version="1.0" encoding="UTF-8"?>
  2 
  3 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5   <modelVersion>4.0.0</modelVersion>
  6 
  7   <groupId>com.edu</groupId>
  8   <artifactId>rabbitmqdemo</artifactId>
  9   <version>1.0</version>
 10 
 11   <name>rabbitmqdemo</name>
 12   <!-- FIXME change it to the project's website -->
 13   <url>http://www.example.com</url>
 14 
 15   <properties>
 16     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 17     <maven.compiler.source>1.7</maven.compiler.source>
 18     <maven.compiler.target>1.7</maven.compiler.target>
 19   </properties>
 20 
 21   <dependencies>
 22     <!--测试包-->
 23     <dependency>
 24       <groupId>junit</groupId>
 25       <artifactId>junit</artifactId>
 26       <version>4.11</version>
 27       <scope>test</scope>
 28     </dependency>
 29     <!--mq客户端-->
 30     <dependency>
 31       <groupId>com.rabbitmq</groupId>
 32       <artifactId>amqp-client</artifactId>
 33       <version>4.5.0</version>
 34     </dependency>
 35     <!--日志-->
 36     <dependency>
 37       <groupId>org.slf4j</groupId>
 38       <artifactId>slf4j-log4j12</artifactId>
 39       <version>1.7.25</version>
 40     </dependency>
 41     <!--工具包-->
 42     <dependency>
 43       <groupId>org.apache.commons</groupId>
 44       <artifactId>commons-lang3</artifactId>
 45       <version>3.3.2</version>
 46     </dependency>
 47     <!--spring集成-->
 48     <dependency>
 49       <groupId>org.springframework.amqp</groupId>
 50       <artifactId>spring-rabbit</artifactId>
 51       <version>1.7.6.RELEASE</version>
 52     </dependency>
 53     <dependency>
 54       <groupId>org.springframework</groupId>
 55       <artifactId>spring-test</artifactId>
 56       <version>4.3.7.RELEASE</version>
 57     </dependency>
 58       <dependency>
 59           <groupId>junit</groupId>
 60           <artifactId>junit</artifactId>
 61           <version>RELEASE</version>
 62           <scope>compile</scope>
 63       </dependency>
 64   </dependencies>
 65 
 66   <build>
 67     <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
 68       <plugins>
 69         <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
 70         <plugin>
 71           <artifactId>maven-clean-plugin</artifactId>
 72           <version>3.1.0</version>
 73         </plugin>
 74         <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
 75         <plugin>
 76           <artifactId>maven-resources-plugin</artifactId>
 77           <version>3.0.2</version>
 78         </plugin>
 79         <plugin>
 80           <artifactId>maven-compiler-plugin</artifactId>
 81           <version>3.8.0</version>
 82         </plugin>
 83         <plugin>
 84           <artifactId>maven-surefire-plugin</artifactId>
 85           <version>2.22.1</version>
 86         </plugin>
 87         <plugin>
 88           <artifactId>maven-jar-plugin</artifactId>
 89           <version>3.0.2</version>
 90         </plugin>
 91         <plugin>
 92           <artifactId>maven-install-plugin</artifactId>
 93           <version>2.5.2</version>
 94         </plugin>
 95         <plugin>
 96           <artifactId>maven-deploy-plugin</artifactId>
 97           <version>2.8.2</version>
 98         </plugin>
 99         <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
100         <plugin>
101           <artifactId>maven-site-plugin</artifactId>
102           <version>3.7.1</version>
103         </plugin>
104         <plugin>
105           <artifactId>maven-project-info-reports-plugin</artifactId>
106           <version>3.0.0</version>
107         </plugin>
108       </plugins>
109     </pluginManagement>
110   </build>
111 </project>

 

(2): 建立日志配置文件,在 resources 下建立 log4j.properties,便于打印精确的日志信息

1 log4j.rootLogger=DEBUG,A1
2 log4j.logger.com.edu=DEBUG
3 log4j.logger.org.mybatis=DEBUG
4 log4j.appender.A1=org.apache.log4j.ConsoleAppender
5 log4j.appender.A1.layout=org.apache.log4j.PatternLayout
6 log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-%m%n

(3): 编写一个工具类,主要用于连接 RabbitMQ

 1 package com.edu.util;
 2 
 3 
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.ConnectionFactory;
 6 
 7 /**
 8  * @ClassName ConnectionUtil
 9  * @Deccription 穿件连接的工具类
10  * @Author DZ
11  * @Date 2019/5/4 12:27
12  **/
13 public class ConnectionUtil {
14     /**
15      * 创建连接工具
16      *
17      * @return
18      * @throws Exception
19      */
20     public static Connection getConnection() throws Exception {
21         ConnectionFactory connectionFactory = new ConnectionFactory();
22         connectionFactory.setHost("127.0.0.1");//MQ的服务器
23         connectionFactory.setPort(5672);//默认端口号
24         connectionFactory.setUsername("test");
25         connectionFactory.setPassword("test");
26         connectionFactory.setVirtualHost("/test");
27         return connectionFactory.newConnection();
28     }
29 }

项目总体图如下:

java基础(六):RabbitMQ 入门

4.1.Hello World模式

此模式非常简单,一个生产者对应一个消费者

java基础(六):RabbitMQ 入门首先我们制造一个消息生产者,并发送消息:
 1 package com.edu.hello;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 
 7 /**
 8  * @ClassName Sender
 9  * @Deccription 创建发送者
10  * @Author DZ
11  * @Date 2019/5/4 12:45
12  **/
13 public class Sender {
14     private final static String QUEUE = "testhello"; //队列的名字
15 
16     public static void main(String[] srgs) throws Exception {
17         //获取连接
18         Connection connection = ConnectionUtil.getConnection();
19         //创建连接
20         Channel channel = connection.createChannel();
21         //声明队列
22         //参数1:队列的名字
23         //参数2:是否持久化队列,我们的队列存在内存中,如果mq重启则丢失。如果为ture,则保存在erlang的数据库中,重启,依旧保存
24         //参数3:是否排外,我们连接关闭后是否自动删除队列,是否私有当前队列,如果私有,其他队列不能访问
25         //参数4:是否自动删除
26         //参数5:我们传入的其他参数
27         channel.queueDeclare(QUEUE, false, false, false, null);
28         //发送内容
29         channel.basicPublish("", QUEUE, null, "要发送的消息".getBytes());
30         //关闭连接
31         channel.close();
32         connection.close();
33     }
34 }

定义一个消息接受者

 1 package com.edu.hello;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 import com.rabbitmq.client.QueueingConsumer;
 7 
 8 /**
 9  * @ClassName Recver
10  * @Deccription 消息接受者
11  * @Author DZ
12  * @Date 2019/5/4 12:58
13  **/
14 public class Recver {
15     private final static String QUEUE = "testhello";//消息队列的名称
16 
17     public static void main(String[] args) throws Exception {
18         Connection connection = ConnectionUtil.getConnection();
19         Channel channel = connection.createChannel();
20         channel.queueDeclare(QUEUE, false, false, false, null);
21         QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
22         //接受消息,参数2表示自动确认消息
23         channel.basicConsume(QUEUE, true, queueingConsumer);
24         while (true) {
25             //获取消息
26             QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();//如果没有消息就等待,有消息就获取消息,并销毁,是一次性的
27             String message = new String(delivery.getBody());
28             System.out.println(message);
29         }
30     }
31 }

此种模式属于“点对点”模式,一个生产者、一个队列、一个消费者,可以运用在聊天室(实际上真实的聊天室比这复杂很多,虽然是“点对点”模式,但是并不是一个生产者,一个队列,一个消费者)

4.2.work queues

java基础(六):RabbitMQ 入门一个生产者对应多个消费者,但是只有一个消费者获得消息

定义消息制造者:

 1 package com.edu.work;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 
 7 /**
 8  * @ClassName Sender
 9  * @Deccription 创建发送者
10  * @Author DZ
11  * @Date 2019/5/4 12:45
12  **/
13 public class Sender {
14     private final static String QUEUE = "testhellowork"; //队列的名字
15 
16     public static void main(String[] srgs) throws Exception {
17         //获取连接
18         Connection connection = ConnectionUtil.getConnection();
19         //创建连接
20         Channel channel = connection.createChannel();
21         //声明队列
22         //参数1:队列的名字
23         //参数2:是否持久化队列,我们的队列存在内存中,如果mq重启则丢失。如果为ture,则保存在erlang的数据库中,重启,依旧保存
24         //参数3:是否排外,我们连接关闭后是否自动删除队列,是否私有当前队列,如果私有,其他队列不能访问
25         //参数4:是否自动删除
26         //参数5:我们传入的其他参数
27         channel.queueDeclare(QUEUE, false, false, false, null);
28         //发送内容
29         for (int i = 0; i < 100; i++) {
30             channel.basicPublish("", QUEUE, null, ("要发送的消息" + i).getBytes());
31         }
32         //关闭连接
33         channel.close();
34         connection.close();
35     }
36 }
 

定义2个消息消费者

 1 package com.edu.work;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 import java.util.Queue;
 8 
 9 /**
10  * @ClassName Recver1
11  * @Deccription 消息接受者
12  * @Author DZ
13  * @Date 2019/5/4 12:58
14  **/
15 public class Recver1 {
16     private final static String QUEUE = "testhellowork";//消息队列的名称
17 
18     public static void main(String[] args) throws Exception {
19         Connection connection = ConnectionUtil.getConnection();
20         final Channel channel = connection.createChannel();
21         channel.queueDeclare(QUEUE, false, false, false, null);
22         //channel.basicQos(1);//告诉服务器,当前消息没有确认之前,不要发送新消息,合理自动分配资源
23         DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
24             @Override
25             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
26                 //收到消息时候调用
27                 System.out.println("消费者1收到的消息:" + new String(body));
28                 /*super.handleDelivery(consumerTag, envelope, properties, body);*/
29                 //确认消息
30                 //参数2:false为确认收到消息,ture为拒绝收到消息
31                 channel.basicAck(envelope.getDeliveryTag(), false);
32             }
33         };
34         //注册消费者
35         // 参数2:手动确认,我们收到消息后,需要手动确认,告诉服务器,我们收到消息了
36         channel.basicConsume(QUEUE, false, defaultConsumer);
37     }
38 }
 1 package com.edu.work;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 /**
 9  * @ClassName Recver1
10  * @Deccription 消息接受者
11  * @Author DZ
12  * @Date 2019/5/4 12:58
13  **/
14 public class Recver2 {
15     private final static String QUEUE = "testhellowork";//消息队列的名称
16 
17     public static void main(String[] args) throws Exception {
18         Connection connection = ConnectionUtil.getConnection();
19         final Channel channel = connection.createChannel();
20         channel.queueDeclare(QUEUE, false, false, false, null);
21         //channel.basicQos(1);
22         DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
23             @Override
24             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
25                 //收到消息时候调用
26                 System.out.println("消费者2收到的消息:" + new String(body));
27                 /*super.handleDelivery(consumerTag, envelope, properties, body);*/
28                 //确认消息
29                 //参数2:false为确认收到消息,ture为拒绝收到消息
30                 channel.basicAck(envelope.getDeliveryTag(), false);
31             }
32         };
33         //注册消费者
34         // 参数2:手动确认,我们收到消息后,需要手动确认,告诉服务器,我们收到消息了
35         channel.basicConsume(QUEUE, false, defaultConsumer);
36     }
37 }

 

这种模式是最简单的 work 模式,消息发送者,循环发送了100次消息,打印结果如下:

java基础(六):RabbitMQ 入门java基础(六):RabbitMQ 入门可以看出,消息消费者消费到的消息是替换的,即一个消息只被消费了一次,且两个消费者各消费了50条消息。这里有个弊端,消息消费者发布消息的时候,无论消费者的消费能力如何(电脑的内存等硬件),消息只会均匀分布给各个消费者(可以给2个消费者 sleep 下,结果还是这样)。有没有什么方式可以让消息自动分配(按照电脑的硬件,能者多劳),答案是可以的,只需要增加 channel.basicQos(1);java基础(六):RabbitMQ 入门此方案可以用来进行负载均衡,抢红包等场景

4.3.public模式

java基础(六):RabbitMQ 入门一个消费者将消息首先发送到交换器,交换器绑定到多个队列,然后被监听该队列的消费者所接收并消费。X 表示交换器,在 RabbitMQ 中,交换器主要有四种类型: direct、fanout、topic、headers,这里的交换器是 fanout,其它类型的交换机自行谷歌,主要区别是交换机的匹配方式发生了变化。

定义消息发布者

 1 package com.edu.publish;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 
 7 /**
 8  * @ClassName Sender
 9  * @Deccription TODO
10  * @Author DZ
11  * @Date 2019/5/4 14:43
12  **/
13 public class Sender {
14     private final static String EXCHANGE_NAME = "testexchange";//定义交换机名字
15 
16     public static void main(String[] args) throws Exception {
17         Connection connection = ConnectionUtil.getConnection();
18         Channel channel = connection.createChannel();
19         //声明交换机
20         //定义一个交换机,类型为fanout,也就是发布订阅者模式
21         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
22         //发布订阅模式,因为消息是先发布到交换机中,而交换机是没有保存功能的,所以如果没有消费者,消息会丢失
23         channel.basicPublish(EXCHANGE_NAME, "", null, "发布订阅模式的消息".getBytes());
24         channel.close();
25         connection.close();
26     }
27 }
定义2个消息消费者
 1 package com.edu.publish;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 /**
 9  * @ClassName Recver1
10  * @Deccription TODO
11  * @Author DZ
12  * @Date 2019/5/4 14:49
13  **/
14 public class Recver1 {
15     //定义交换机
16     private final static String EXCHANGE_NAME = "testexchange";
17     private final static String QUEUE = "testpubqueue1";
18 
19     public static void main(String[] args) throws Exception {
20         Connection connection = ConnectionUtil.getConnection();
21         final Channel channel = connection.createChannel();
22         channel.queueDeclare(QUEUE, false, false, false, null);
23         //绑定队列到交换机
24         channel.queueBind(QUEUE, EXCHANGE_NAME, "");
25         channel.basicQos(1);
26         DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
27             @Override
28             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
29                 /* super.handleDelivery(consumerTag, envelope, properties, body);*/
30                 System.out.println("消费者1:" + new String(body));
31                 channel.basicAck(envelope.getDeliveryTag(), false);
32             }
33         };
34         channel.basicConsume(QUEUE, false, defaultConsumer);
35     }
36 }
 1 package com.edu.publish;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 /**
 9  * @ClassName Recver1
10  * @Deccription TODO
11  * @Author DZ
12  * @Date 2019/5/4 14:49
13  **/
14 public class Recver2 {
15     //定义交换机
16     private final static String EXCHANGE_NAME = "testexchange";
17     private final static String QUEUE = "testpubqueue2";
18 
19     public static void main(String[] args) throws Exception {
20         Connection connection = ConnectionUtil.getConnection();
21         final Channel channel = connection.createChannel();
22         channel.queueDeclare(QUEUE, false, false, false, null);
23         //绑定队列到交换机
24         channel.queueBind(QUEUE, EXCHANGE_NAME, "");
25         channel.basicQos(1);
26         DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
27             @Override
28             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
29                 /* super.handleDelivery(consumerTag, envelope, properties, body);*/
30                 System.out.println("消费者2:" + new String(body));
31                 channel.basicAck(envelope.getDeliveryTag(), false);
32             }
33         };
34         channel.basicConsume(QUEUE, false, defaultConsumer);
35     }
36 }

 

消费者1 和消费者2 都监听了被同一个交换器绑定的队列,因此消息被同时消费到了。如果消息发送到没有队列绑定的交换器时,消息将丢失,因为交换器没有存储消息的能力,消息只能存储在队列中。

应用场景:比如一个商城系统需要在管理员上传商品新的图片时,前台系统必须更新图片,日志系统必须记录相应的日志,那么就可以将两个队列绑定到图片上传交换器上,一个用于前台系统更新图片,另一个用于日志系统记录日志。

4.4.routing

java基础(六):RabbitMQ 入门生产者将消息发送到 direct 交换器,在绑定队列和交换器的时候有一个路由 key,生产者发送的消息会指定一个路由 key,那么消息只会发送到相应 key 相同的队列,接着监听该队列的消费者消费消息。

定义消息发布者

 1 package com.edu.route;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 
 7 /**
 8  * @ClassName Sender
 9  * @Deccription TODO
10  * @Author DZ
11  * @Date 2019/5/4 15:05
12  **/
13 public class Sender {
14     private final static String EXCANGE_NAME = "testroute";
15 
16     public static void main(String[] args) throws Exception {
17         Connection connection = ConnectionUtil.getConnection();
18         Channel channel = connection.createChannel();
19         //定义路由格式的交换机
20         channel.exchangeDeclare(EXCANGE_NAME, "direct");
21         channel.basicPublish(EXCANGE_NAME, "key2", null, "路由模式的消息".getBytes());
22         channel.close();
23         connection.close();
24     }
25 }
 1 package com.edu.route;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 /**
 9  * @ClassName Recver1
10  * @Deccription TODO
11  * @Author DZ
12  * @Date 2019/5/4 14:49
13  **/
14 public class Recver1 {
15     //定义交换机
16     private final static String EXCHANGE_NAME = "testroute";
17     private final static String QUEUE = "testroute1queue";
18 
19     public static void main(String[] args) throws Exception {
20         Connection connection = ConnectionUtil.getConnection();
21         final Channel channel = connection.createChannel();
22         channel.queueDeclare(QUEUE, false, false, false, null);
23         //绑定队列到交换机
24         //参数3:绑定到交换机指定的路由的名字
25         channel.queueBind(QUEUE, EXCHANGE_NAME, "key1");
26         //如果需要绑定多个路由,再绑定一次即可
27         channel.queueBind(QUEUE, EXCHANGE_NAME, "key2");
28         channel.basicQos(1);
29         DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
30             @Override
31             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
32                 /* super.handleDelivery(consumerTag, envelope, properties, body);*/
33                 System.out.println("消费者1:" + new String(body));
34                 channel.basicAck(envelope.getDeliveryTag(), false);
35             }
36         };
37         channel.basicConsume(QUEUE, false, defaultConsumer);
38     }
39 }
 1 package com.edu.route;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 /**
 9  * @ClassName Recver1
10  * @Deccription TODO
11  * @Author DZ
12  * @Date 2019/5/4 14:49
13  **/
14 public class Recver2 {
15     //定义交换机
16     private final static String EXCHANGE_NAME = "testroute";
17     private final static String QUEUE = "testroute2queue";
18 
19     public static void main(String[] args) throws Exception {
20         Connection connection = ConnectionUtil.getConnection();
21         final Channel channel = connection.createChannel();
22         channel.queueDeclare(QUEUE, false, false, false, null);
23         //绑定队列到交换机
24         //参数3:绑定到交换机指定的路由的名字
25         channel.queueBind(QUEUE, EXCHANGE_NAME, "key1");
26         //如果需要绑定多个路由,再绑定一次即可
27         channel.queueBind(QUEUE, EXCHANGE_NAME, "key3");
28         channel.basicQos(1);
29         DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
30             @Override
31             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
32                 /* super.handleDelivery(consumerTag, envelope, properties, body);*/
33                 System.out.println("消费者2:" + new String(body));
34                 channel.basicAck(envelope.getDeliveryTag(), false);
35             }
36         };
37         channel.basicConsume(QUEUE, false, defaultConsumer);
38     }
39 }

 

应用场景:利用消费者能够有选择性的接收消息的特性,比如我们商城系统的后台管理系统对于商品进行修改、删除、新增操作都需要更新前台系统的界面展示,而查询操作确不需要,那么这两个队列分开接收消息就比较好。

4.5.Topic

java基础(六):RabbitMQ 入门上面的路由模式是根据路由key进行完整的匹配(完全相等才发送消息),这里的通配符模式通俗的来讲就是模糊匹配。符号 “#” 表示匹配一个或多个词,符号 “*” 表示匹配一个词。实际上 Topic 模式是 routing 模式的扩展
 1 package com.edu.topic;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 
 7 /**
 8  * @ClassName Sender
 9  * @Deccription TODO
10  * @Author DZ
11  * @Date 2019/5/4 15:19
12  **/
13 public class Sender {
14     private final static String EXCANGE_NAME = "testtopexchange";
15 
16     public static void main(String[] args) throws Exception {
17         Connection connection = ConnectionUtil.getConnection();
18         Channel channel = connection.createChannel();
19         channel.exchangeDeclare(EXCANGE_NAME, "topic");
20         channel.basicPublish(EXCANGE_NAME, "abc.adb.1", null, "topic模式消息发送者:".getBytes());
21         channel.close();
22         connection.close();
23     }
24 }
 1 package com.edu.topic;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 /**
 9  * @ClassName Recver1
10  * @Deccription TODO
11  * @Author DZ
12  * @Date 2019/5/4 14:49
13  **/
14 public class Recver1 {
15     //定义交换机
16     private final static String EXCHANGE_NAME = "testtopexchange";
17     private final static String QUEUE = "testtopic1queue";
18 
19     public static void main(String[] args) throws Exception {
20         Connection connection = ConnectionUtil.getConnection();
21         final Channel channel = connection.createChannel();
22         channel.queueDeclare(QUEUE, false, false, false, null);
23         //绑定队列到交换机
24         //参数3:绑定到交换机指定的路由的名字
25         channel.queueBind(QUEUE, EXCHANGE_NAME, "key.*");
26         //如果需要绑定多个路由,再绑定一次即可
27         channel.queueBind(QUEUE, EXCHANGE_NAME, "abc.*");
28         channel.basicQos(1);
29         DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
30             @Override
31             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
32                 /* super.handleDelivery(consumerTag, envelope, properties, body);*/
33                 System.out.println("消费者1:" + new String(body));
34                 channel.basicAck(envelope.getDeliveryTag(), false);
35             }
36         };
37         channel.basicConsume(QUEUE, false, defaultConsumer);
38     }
39 }
 1 package com.edu.topic;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 /**
 9  * @ClassName Recver1
10  * @Deccription TODO
11  * @Author DZ
12  * @Date 2019/5/4 14:49
13  **/
14 public class Recver2 {
15     //定义交换机
16     private final static String EXCHANGE_NAME = "testtopexchange";
17     private final static String QUEUE = "testtopic2queue";
18 
19     public static void main(String[] args) throws Exception {
20         Connection connection = ConnectionUtil.getConnection();
21         final Channel channel = connection.createChannel();
22         channel.queueDeclare(QUEUE, false, false, false, null);
23         //绑定队列到交换机
24         //参数3:绑定到交换机指定的路由的名字
25         channel.queueBind(QUEUE, EXCHANGE_NAME, "key.*");
26         //如果需要绑定多个路由,再绑定一次即可
27         channel.queueBind(QUEUE, EXCHANGE_NAME, "abc.#");
28         channel.basicQos(1);
29         DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
30             @Override
31             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
32                 /* super.handleDelivery(consumerTag, envelope, properties, body);*/
33                 System.out.println("消费者2:" + new String(body));
34                 channel.basicAck(envelope.getDeliveryTag(), false);
35             }
36         };
37         channel.basicConsume(QUEUE, false, defaultConsumer);
38     }
39 }

 

第六种模式是将上述的模式集成其它的框架,进行远程访问,这里我们将集成 Spring 实现 RCP 远程模式的使用

5.Spring 集成 RabbitMQ

5.1.自动集成 Spring

编写spring的配置,此配置文件的目的是将 Spring 与 RabbitMQ 进行整合,实际上就是将 MQ 的相关信息(连接,队列,交换机……)通过XML配置的方式实现

 1 <beans xmlns="http://www.springframework.org/schema/beans"
 2        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3        xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 4        xsi:schemaLocation="http://www.springframework.org/schema/rabbit
 5        http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd
 6        http://www.springframework.org/schema/beans
 7        http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">
 8     <!--定义连接工厂-->
 9     <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="test" password="test"
10                                virtual-host="/test"/>
11     <!--
12      定义模板
13      第三个参数,决定消息发送到哪里,如果为exchange,则发送到交换机;如果为queue,则发送到队列
14     -->
15     <rabbit:template id="template" connection-factory="connectionFactory" exchange="fanoutExchange"/>
16     <rabbit:admin connection-factory="connectionFactory"/>
17     <!--定义队列-->
18     <rabbit:queue name="myQueue" auto-declare="true"/>
19     <!--定义交换机-->
20     <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
21         <!--将消息绑定到交换机-->
22         <rabbit:bindings>
23             <rabbit:binding queue="myQueue">
24 
25             </rabbit:binding>
26         </rabbit:bindings>
27     </rabbit:fanout-exchange>
28     <!--定义监听器,收到消息会执行-->
29     <rabbit:listener-container connection-factory="connectionFactory">
30        <!-- 定义监听的类和方法-->
31         <rabbit:listener ref="consumer" method="test" queue-names="myQueue"/>
32     </rabbit:listener-container>
33     <!--定义消费者-->
34     <bean id="consumer" class="com.edu.spring.MyConsumer"/>
35 
36 </beans>

 

生产者:

 1 package com.edu.spring;
 2 
 3 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 4 import org.springframework.context.ApplicationContext;
 5 import org.springframework.context.support.ClassPathXmlApplicationContext;
 6 
 7 /**
 8  * @ClassName SpringTest
 9  * @Deccription TODO
10  * @Author DZ
11  * @Date 2019/5/4 18:40
12  **/
13 public class SpringTest {
14     public static void main(String[] args) throws Exception {
15         ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
16         RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
17         rabbitTemplate.convertAndSend("Spring的消息");
18         ((ClassPathXmlApplicationContext) applicationContext).destroy();
19     }
20 }

消费者

 1 package com.edu.spring;
 2 
 3 /**
 4  * @ClassName MyConsumer
 5  * @Deccription TODO
 6  * @Author DZ
 7  * @Date 2019/5/4 18:35
 8  **/
 9 public class MyConsumer {
10     /*用于接收消息*/
11     public void test(String message) {
12         System.err.println(message);
13     }
14 }
 

集成Spring主要是在xml中实现了队列和交换机的创建。

java基础(六):RabbitMQ 入门最好能理解上面的图。理解后,以后写相关的代码,直接去网上 copy 一份配置文件,然后根据自己项目的情况进行修改。如果不能理解,就不知道如何修改出现错误后不知道错误出现在什么地方。

5.2.手动模式

手动模式,主要增加MQ的回调操作,MQ消息失败或者成功就有相应的回调信息,增强系统的健壮性,一旦产生异常,很快就能定位到异常的位置,所以在实际开发中,一般都这种方式

创建xml配置文件

 1 <beans xmlns="http://www.springframework.org/schema/beans"
 2        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3        xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 4        xmlns:context="http://www.springframework.org/schema/context"
 5        xsi:schemaLocation="http://www.springframework.org/schema/rabbit
 6        http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd
 7        http://www.springframework.org/schema/beans
 8        http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
 9        http://www.springframework.org/schema/context
10        http://www.springframework.org/schema/context/spring-context-4.3.xsd">
11     <context:component-scan base-package="com.edu.spring2"/>
12     <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
13 
14     <!--
15     定义连接工厂
16     publisher-confirms为ture,确认失败等回调才会执行
17     -->
18     <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="test" password="test"
19                                virtual-host="/test" publisher-confirms="true"/>
20 
21     <rabbit:admin connection-factory="connectionFactory"/>
22     <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" confirm-callback="confirmCallBackListener"
23                      return-callback="returnCallBackListener"
24                      mandatory="true"/>
25     <!--定义队列-->
26     <rabbit:queue name="myQueue" auto-declare="true"/>
27     <!--定义交换机-->
28     <rabbit:direct-exchange name="DIRECT_EX" id="DIRECT_EX">
29         <!--将消息绑定到交换机-->
30         <rabbit:bindings>
31             <rabbit:binding queue="myQueue">
32 
33             </rabbit:binding>
34         </rabbit:bindings>
35     </rabbit:direct-exchange>
36     <!--定义监听器,收到消息会执行-->
37     <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
38         <!-- 定义监听的类和方法-->
39         <rabbit:listener queues="myQueue" ref="receiveConfirmTestListener"/>
40     </rabbit:listener-container>
41 
42 </beans>
 

创建回调监听函数

 1 package com.edu.spring2;
 2 
 3 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 4 import org.springframework.amqp.rabbit.support.CorrelationData;
 5 import org.springframework.stereotype.Component;
 6 
 7 /**
 8  * @ClassName ConfirmCallBackListener
 9  * @Deccription TODO
10  * @Author DZ
11  * @Date 2019/5/4 22:26
12  **/
13 @Component("confirmCallBackListener")
14 public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback {
15 
16     @Override
17     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
18         System.out.println("确认回调 ack==" + ack + "回调原因==" + cause);
19     }
20 }
 1 package com.edu.spring2;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import org.springframework.amqp.core.Message;
 5 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
 6 import org.springframework.stereotype.Component;
 7 
 8 /**
 9  * @ClassName ReceiveConfirmTestListener
10  * @Deccription TODO
11  * @Author DZ
12  * @Date 2019/5/4 22:24
13  **/
14 @Component("receiveConfirmTestListener")
15 public class ReceiveConfirmTestListener implements ChannelAwareMessageListener {
16     /**
17      * 收到消息时,执行的监听
18      *
19      * @param message
20      * @param channel
21      * @throws Exception
22      */
23     @Override
24     public void onMessage(Message message, Channel channel) throws Exception {
25         System.out.println(("消费者收到了消息" + message));
26         channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
27     }
28 }
 1 package com.edu.spring2;
 2 
 3 import org.springframework.amqp.core.Message;
 4 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 5 import org.springframework.stereotype.Component;
 6 
 7 /**
 8  * @ClassName ReturnCallBackListener
 9  * @Deccription TODO
10  * @Author DZ
11  * @Date 2019/5/4 22:28
12  **/
13 @Component("returnCallBackListener")
14 public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback {
15     @Override
16     public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
17         System.out.println("失败回调" + message);
18     }
19 }

 

回调函数的配置来自 XML

java基础(六):RabbitMQ 入门
创建发送消息的工具类
 1 package com.edu.spring2;
 2 
 3 import org.springframework.amqp.core.AmqpTemplate;
 4 import org.springframework.beans.factory.annotation.Autowired;
 5 import org.springframework.stereotype.Component;
 6 
 7 /**
 8  * @ClassName PublicUtil
 9  * @Deccription TODO
10  * @Author DZ
11  * @Date 2019/5/4 22:30
12  **/
13 @Component("publicUtil")
14 public class PublicUtil {
15     @Autowired
16     private AmqpTemplate amqpTemplate;
17 
18     public void send(String excange, String routingkey, Object message) {
19         amqpTemplate.convertAndSend(excange, routingkey, message);
20     }
21 }

 

 

创建测试类

 1 package com.edu.spring2;
 2 
 3 import org.junit.Test;
 4 import org.junit.runner.RunWith;
 5 import org.springframework.beans.factory.annotation.Autowired;
 6 import org.springframework.test.context.ContextConfiguration;
 7 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 8 
 9 /**
10  * @ClassName TestMain
11  * @Deccription TODO
12  * @Author DZ
13  * @Date 2019/5/4 22:32
14  **/
15 @RunWith(SpringJUnit4ClassRunner.class)
16 @ContextConfiguration(locations = {"classpath:applicationContext2.xml"})
17 public class TestMain {
18     @Autowired
19     private PublicUtil publicUtil;
20     private static String exChange = "DIRECT_EX";//交换机
21     private static String queue = "myQueue";
22 
23     /**
24      * exChange和queue均正确
25      * confirm会执行,ack = ture
26      * 消息正常接收(接收消息确认方法正常执行)
27      */
28     @Test
29     public void test1() throws Exception {
30         publicUtil.send(exChange, queue, "测试1,队列和交换机均正确");
31     }
32     /**
33      * exChange错误,queue正确
34      * confirm执行,ack=false
35      * 消息无法接收(接收消息确认方法不能执行)
36      */
37     @Test
38     public void test2() throws Exception {
39         publicUtil.send(exChange + "1", queue, "测试2,队列正确,交换机错误");
40     }
41     /**
42      * exChange正常,queue错误
43      * return执行
44      * confirm执行,ack=ture
45      */
46     @Test
47     public void test3() throws Exception {
48         publicUtil.send(exChange, queue + "1", "测试2,队列错误,交换机正确");
49     }
50     /**
51      * exChange错误,queue错误
52      * confirm执行,ack=false
53      */
54     @Test
55     public void test4() throws Exception {
56         publicUtil.send(exChange + "1", queue + "1", "测试2,队列错误,交换机错误");
57     }
58 }
 

测试结果如下:

  • test1:exChange和queue均正确

    java基础(六):RabbitMQ 入门
  • confirm会执行,ack=ture;能正常收到消息(接收消息的方法正常执行)
  • test2:exChange错误,queue正确

java基础(六):RabbitMQ 入门confirm执行,ack=false;不能正常接收到消息
  • test3:exChange正确,queue错误
java基础(六):RabbitMQ 入门confirm执行,ack=ture;return执行;不能接收到消息
  • test4:exChange和queue均错误
java基础(六):RabbitMQ 入门confirm执行,ack=false;不能接收消息

上述结论及代码如下图:

java基础(六):RabbitMQ 入门

根据上述的测试结果,我们可以根据回调函数的返回结果,查看MQ的错误出现在那里。根据上述结论,我们可以对3个回调函数做如下处理:

  • 类 ReceiveConfirmTestListener 中的onMessage方法主要用于接收从 RabbitMQ 推送过来的消息,并对消息做相应的逻辑处理

  • 类 ConfirmCallBackListener 中的 confirm 方法主要用于检查交换机(exChange),当 ack=false,交换机可能错误

  • 类 ReturnCallBackListener 中的 returnedMessage 方法用于检查队列(queue),当此方法执行时,队列可能错误

java基础(六):RabbitMQ 入门所以3个相应的方法可以做如下调整:java基础(六):RabbitMQ 入门java基础(六):RabbitMQ 入门java基础(六):RabbitMQ 入门

实际上,在真实项目中,上面3个方法也是按照这3个逻辑进行设计的。当然这3个方法中还可以加入更多的日志消息,和逻辑处理业务。

6.参考

https://blog.csdn.net/liu911025/article/details/80460182

https://blog.csdn.net/lyhkmm/article/details/78775369

https://blog.csdn.net/vbirdbest/article/details/78670550

https://blog.csdn.net/vbirdbest/article/details/78670550

https://www.rabbitmq.com/getstarted.html

相关文章: