建议先了解为什么项目要使用 MQ 消息队列,MQ 消息队列有什么优点,如果在业务逻辑上没有此种需求,建议不要使用中间件。中间件对系统的性能做优化的同时,同时增加了系统的复杂性也维护难易度;其次,需要了解各种常见的 MQ 消息队列有什么区别,以便在相同的成本下选择一种最合适本系统的技术。
本文主要讨论 RabbitMQ,从3月底接触一个项目使用了 RabbitMQ,就开始着手学习,主要通过视频和博客学习了一个月,基本明白了 RabbitMQ 的应用,其它的 MQ 队列还不清楚,其底层技术还有待学习,以下是我目前的学习心得。
RabbitMQ 是基于 Erlang 语言写的,所以首先安装 Erlang,本例是在 Windows 上安装,也可以选择在 Linux 上安装,机器上没有虚拟机,直接在 Windows 上操作,建议在 Linux 上安装。官方下载 Erlang 软件,我下载最新版本 21.3。安装过程很简单,直接 Next 到底。 Linux 安装自行谷歌。如下图:
2.安装 RabbitMQ
在官方下载,选择最新版本 3.7。安装过程很简单,直接 Next 到底。如下图:
执行 rabbitmq-server start 命令,启动服务。本地登陆并创建用户,如下图:
1、 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
4.JAVA 操作RabbitMQ
参考 RabbitMQ 官网,一共分为6个模式
RabbitMQ 是一个消息代理,实际上,它接收生产者产生的消息,然后将消息传递给消费者。在这个过程中,它可以路由、缓冲、持久化等,在传输过程中,主要又三部分组成。生产者:发送消息的一端
(1): 准备必要的 Pom 文件,导入相应的 jar 包,
1 <?xml version="1.0" encoding="UTF-8"?> 2 3 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 7 <groupId>com.edu</groupId> 8 <artifactId>rabbitmqdemo</artifactId> 9 <version>1.0</version> 10 11 <name>rabbitmqdemo</name> 12 <!-- FIXME change it to the project's website --> 13 <url>http://www.example.com</url> 14 15 <properties> 16 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 17 <maven.compiler.source>1.7</maven.compiler.source> 18 <maven.compiler.target>1.7</maven.compiler.target> 19 </properties> 20 21 <dependencies> 22 <!--测试包--> 23 <dependency> 24 <groupId>junit</groupId> 25 <artifactId>junit</artifactId> 26 <version>4.11</version> 27 <scope>test</scope> 28 </dependency> 29 <!--mq客户端--> 30 <dependency> 31 <groupId>com.rabbitmq</groupId> 32 <artifactId>amqp-client</artifactId> 33 <version>4.5.0</version> 34 </dependency> 35 <!--日志--> 36 <dependency> 37 <groupId>org.slf4j</groupId> 38 <artifactId>slf4j-log4j12</artifactId> 39 <version>1.7.25</version> 40 </dependency> 41 <!--工具包--> 42 <dependency> 43 <groupId>org.apache.commons</groupId> 44 <artifactId>commons-lang3</artifactId> 45 <version>3.3.2</version> 46 </dependency> 47 <!--spring集成--> 48 <dependency> 49 <groupId>org.springframework.amqp</groupId> 50 <artifactId>spring-rabbit</artifactId> 51 <version>1.7.6.RELEASE</version> 52 </dependency> 53 <dependency> 54 <groupId>org.springframework</groupId> 55 <artifactId>spring-test</artifactId> 56 <version>4.3.7.RELEASE</version> 57 </dependency> 58 <dependency> 59 <groupId>junit</groupId> 60 <artifactId>junit</artifactId> 61 <version>RELEASE</version> 62 <scope>compile</scope> 63 </dependency> 64 </dependencies> 65 66 <build> 67 <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --> 68 <plugins> 69 <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle --> 70 <plugin> 71 <artifactId>maven-clean-plugin</artifactId> 72 <version>3.1.0</version> 73 </plugin> 74 <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging --> 75 <plugin> 76 <artifactId>maven-resources-plugin</artifactId> 77 <version>3.0.2</version> 78 </plugin> 79 <plugin> 80 <artifactId>maven-compiler-plugin</artifactId> 81 <version>3.8.0</version> 82 </plugin> 83 <plugin> 84 <artifactId>maven-surefire-plugin</artifactId> 85 <version>2.22.1</version> 86 </plugin> 87 <plugin> 88 <artifactId>maven-jar-plugin</artifactId> 89 <version>3.0.2</version> 90 </plugin> 91 <plugin> 92 <artifactId>maven-install-plugin</artifactId> 93 <version>2.5.2</version> 94 </plugin> 95 <plugin> 96 <artifactId>maven-deploy-plugin</artifactId> 97 <version>2.8.2</version> 98 </plugin> 99 <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle --> 100 <plugin> 101 <artifactId>maven-site-plugin</artifactId> 102 <version>3.7.1</version> 103 </plugin> 104 <plugin> 105 <artifactId>maven-project-info-reports-plugin</artifactId> 106 <version>3.0.0</version> 107 </plugin> 108 </plugins> 109 </pluginManagement> 110 </build> 111 </project>
(2): 建立日志配置文件,在 resources 下建立 log4j.properties,便于打印精确的日志信息
1 log4j.rootLogger=DEBUG,A1 2 log4j.logger.com.edu=DEBUG 3 log4j.logger.org.mybatis=DEBUG 4 log4j.appender.A1=org.apache.log4j.ConsoleAppender 5 log4j.appender.A1.layout=org.apache.log4j.PatternLayout 6 log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-%m%n
(3): 编写一个工具类,主要用于连接 RabbitMQ
1 package com.edu.util; 2 3 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.ConnectionFactory; 6 7 /** 8 * @ClassName ConnectionUtil 9 * @Deccription 穿件连接的工具类 10 * @Author DZ 11 * @Date 2019/5/4 12:27 12 **/ 13 public class ConnectionUtil { 14 /** 15 * 创建连接工具 16 * 17 * @return 18 * @throws Exception 19 */ 20 public static Connection getConnection() throws Exception { 21 ConnectionFactory connectionFactory = new ConnectionFactory(); 22 connectionFactory.setHost("127.0.0.1");//MQ的服务器 23 connectionFactory.setPort(5672);//默认端口号 24 connectionFactory.setUsername("test"); 25 connectionFactory.setPassword("test"); 26 connectionFactory.setVirtualHost("/test"); 27 return connectionFactory.newConnection(); 28 } 29 }
项目总体图如下:
4.1.Hello World模式
此模式非常简单,一个生产者对应一个消费者
1 package com.edu.hello; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 7 /** 8 * @ClassName Sender 9 * @Deccription 创建发送者 10 * @Author DZ 11 * @Date 2019/5/4 12:45 12 **/ 13 public class Sender { 14 private final static String QUEUE = "testhello"; //队列的名字 15 16 public static void main(String[] srgs) throws Exception { 17 //获取连接 18 Connection connection = ConnectionUtil.getConnection(); 19 //创建连接 20 Channel channel = connection.createChannel(); 21 //声明队列 22 //参数1:队列的名字 23 //参数2:是否持久化队列,我们的队列存在内存中,如果mq重启则丢失。如果为ture,则保存在erlang的数据库中,重启,依旧保存 24 //参数3:是否排外,我们连接关闭后是否自动删除队列,是否私有当前队列,如果私有,其他队列不能访问 25 //参数4:是否自动删除 26 //参数5:我们传入的其他参数 27 channel.queueDeclare(QUEUE, false, false, false, null); 28 //发送内容 29 channel.basicPublish("", QUEUE, null, "要发送的消息".getBytes()); 30 //关闭连接 31 channel.close(); 32 connection.close(); 33 } 34 }
定义一个消息接受者
1 package com.edu.hello; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 import com.rabbitmq.client.QueueingConsumer; 7 8 /** 9 * @ClassName Recver 10 * @Deccription 消息接受者 11 * @Author DZ 12 * @Date 2019/5/4 12:58 13 **/ 14 public class Recver { 15 private final static String QUEUE = "testhello";//消息队列的名称 16 17 public static void main(String[] args) throws Exception { 18 Connection connection = ConnectionUtil.getConnection(); 19 Channel channel = connection.createChannel(); 20 channel.queueDeclare(QUEUE, false, false, false, null); 21 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); 22 //接受消息,参数2表示自动确认消息 23 channel.basicConsume(QUEUE, true, queueingConsumer); 24 while (true) { 25 //获取消息 26 QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();//如果没有消息就等待,有消息就获取消息,并销毁,是一次性的 27 String message = new String(delivery.getBody()); 28 System.out.println(message); 29 } 30 } 31 }
此种模式属于“点对点”模式,一个生产者、一个队列、一个消费者,可以运用在聊天室(实际上真实的聊天室比这复杂很多,虽然是“点对点”模式,但是并不是一个生产者,一个队列,一个消费者)
4.2.work queues
定义消息制造者:
1 package com.edu.work; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 7 /** 8 * @ClassName Sender 9 * @Deccription 创建发送者 10 * @Author DZ 11 * @Date 2019/5/4 12:45 12 **/ 13 public class Sender { 14 private final static String QUEUE = "testhellowork"; //队列的名字 15 16 public static void main(String[] srgs) throws Exception { 17 //获取连接 18 Connection connection = ConnectionUtil.getConnection(); 19 //创建连接 20 Channel channel = connection.createChannel(); 21 //声明队列 22 //参数1:队列的名字 23 //参数2:是否持久化队列,我们的队列存在内存中,如果mq重启则丢失。如果为ture,则保存在erlang的数据库中,重启,依旧保存 24 //参数3:是否排外,我们连接关闭后是否自动删除队列,是否私有当前队列,如果私有,其他队列不能访问 25 //参数4:是否自动删除 26 //参数5:我们传入的其他参数 27 channel.queueDeclare(QUEUE, false, false, false, null); 28 //发送内容 29 for (int i = 0; i < 100; i++) { 30 channel.basicPublish("", QUEUE, null, ("要发送的消息" + i).getBytes()); 31 } 32 //关闭连接 33 channel.close(); 34 connection.close(); 35 } 36 }
定义2个消息消费者
1 package com.edu.work; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 import java.util.Queue; 8 9 /** 10 * @ClassName Recver1 11 * @Deccription 消息接受者 12 * @Author DZ 13 * @Date 2019/5/4 12:58 14 **/ 15 public class Recver1 { 16 private final static String QUEUE = "testhellowork";//消息队列的名称 17 18 public static void main(String[] args) throws Exception { 19 Connection connection = ConnectionUtil.getConnection(); 20 final Channel channel = connection.createChannel(); 21 channel.queueDeclare(QUEUE, false, false, false, null); 22 //channel.basicQos(1);//告诉服务器,当前消息没有确认之前,不要发送新消息,合理自动分配资源 23 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { 24 @Override 25 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 26 //收到消息时候调用 27 System.out.println("消费者1收到的消息:" + new String(body)); 28 /*super.handleDelivery(consumerTag, envelope, properties, body);*/ 29 //确认消息 30 //参数2:false为确认收到消息,ture为拒绝收到消息 31 channel.basicAck(envelope.getDeliveryTag(), false); 32 } 33 }; 34 //注册消费者 35 // 参数2:手动确认,我们收到消息后,需要手动确认,告诉服务器,我们收到消息了 36 channel.basicConsume(QUEUE, false, defaultConsumer); 37 } 38 }
1 package com.edu.work; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 /** 9 * @ClassName Recver1 10 * @Deccription 消息接受者 11 * @Author DZ 12 * @Date 2019/5/4 12:58 13 **/ 14 public class Recver2 { 15 private final static String QUEUE = "testhellowork";//消息队列的名称 16 17 public static void main(String[] args) throws Exception { 18 Connection connection = ConnectionUtil.getConnection(); 19 final Channel channel = connection.createChannel(); 20 channel.queueDeclare(QUEUE, false, false, false, null); 21 //channel.basicQos(1); 22 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { 23 @Override 24 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 25 //收到消息时候调用 26 System.out.println("消费者2收到的消息:" + new String(body)); 27 /*super.handleDelivery(consumerTag, envelope, properties, body);*/ 28 //确认消息 29 //参数2:false为确认收到消息,ture为拒绝收到消息 30 channel.basicAck(envelope.getDeliveryTag(), false); 31 } 32 }; 33 //注册消费者 34 // 参数2:手动确认,我们收到消息后,需要手动确认,告诉服务器,我们收到消息了 35 channel.basicConsume(QUEUE, false, defaultConsumer); 36 } 37 }
这种模式是最简单的 work 模式,消息发送者,循环发送了100次消息,打印结果如下:
4.3.public模式
定义消息发布者
1 package com.edu.publish; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 7 /** 8 * @ClassName Sender 9 * @Deccription TODO 10 * @Author DZ 11 * @Date 2019/5/4 14:43 12 **/ 13 public class Sender { 14 private final static String EXCHANGE_NAME = "testexchange";//定义交换机名字 15 16 public static void main(String[] args) throws Exception { 17 Connection connection = ConnectionUtil.getConnection(); 18 Channel channel = connection.createChannel(); 19 //声明交换机 20 //定义一个交换机,类型为fanout,也就是发布订阅者模式 21 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); 22 //发布订阅模式,因为消息是先发布到交换机中,而交换机是没有保存功能的,所以如果没有消费者,消息会丢失 23 channel.basicPublish(EXCHANGE_NAME, "", null, "发布订阅模式的消息".getBytes()); 24 channel.close(); 25 connection.close(); 26 } 27 }
定义2个消息消费者
1 package com.edu.publish; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 /** 9 * @ClassName Recver1 10 * @Deccription TODO 11 * @Author DZ 12 * @Date 2019/5/4 14:49 13 **/ 14 public class Recver1 { 15 //定义交换机 16 private final static String EXCHANGE_NAME = "testexchange"; 17 private final static String QUEUE = "testpubqueue1"; 18 19 public static void main(String[] args) throws Exception { 20 Connection connection = ConnectionUtil.getConnection(); 21 final Channel channel = connection.createChannel(); 22 channel.queueDeclare(QUEUE, false, false, false, null); 23 //绑定队列到交换机 24 channel.queueBind(QUEUE, EXCHANGE_NAME, ""); 25 channel.basicQos(1); 26 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { 27 @Override 28 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 29 /* super.handleDelivery(consumerTag, envelope, properties, body);*/ 30 System.out.println("消费者1:" + new String(body)); 31 channel.basicAck(envelope.getDeliveryTag(), false); 32 } 33 }; 34 channel.basicConsume(QUEUE, false, defaultConsumer); 35 } 36 }
1 package com.edu.publish; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 /** 9 * @ClassName Recver1 10 * @Deccription TODO 11 * @Author DZ 12 * @Date 2019/5/4 14:49 13 **/ 14 public class Recver2 { 15 //定义交换机 16 private final static String EXCHANGE_NAME = "testexchange"; 17 private final static String QUEUE = "testpubqueue2"; 18 19 public static void main(String[] args) throws Exception { 20 Connection connection = ConnectionUtil.getConnection(); 21 final Channel channel = connection.createChannel(); 22 channel.queueDeclare(QUEUE, false, false, false, null); 23 //绑定队列到交换机 24 channel.queueBind(QUEUE, EXCHANGE_NAME, ""); 25 channel.basicQos(1); 26 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { 27 @Override 28 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 29 /* super.handleDelivery(consumerTag, envelope, properties, body);*/ 30 System.out.println("消费者2:" + new String(body)); 31 channel.basicAck(envelope.getDeliveryTag(), false); 32 } 33 }; 34 channel.basicConsume(QUEUE, false, defaultConsumer); 35 } 36 }
消费者1 和消费者2 都监听了被同一个交换器绑定的队列,因此消息被同时消费到了。如果消息发送到没有队列绑定的交换器时,消息将丢失,因为交换器没有存储消息的能力,消息只能存储在队列中。
应用场景:比如一个商城系统需要在管理员上传商品新的图片时,前台系统必须更新图片,日志系统必须记录相应的日志,那么就可以将两个队列绑定到图片上传交换器上,一个用于前台系统更新图片,另一个用于日志系统记录日志。
4.4.routing
定义消息发布者
1 package com.edu.route; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 7 /** 8 * @ClassName Sender 9 * @Deccription TODO 10 * @Author DZ 11 * @Date 2019/5/4 15:05 12 **/ 13 public class Sender { 14 private final static String EXCANGE_NAME = "testroute"; 15 16 public static void main(String[] args) throws Exception { 17 Connection connection = ConnectionUtil.getConnection(); 18 Channel channel = connection.createChannel(); 19 //定义路由格式的交换机 20 channel.exchangeDeclare(EXCANGE_NAME, "direct"); 21 channel.basicPublish(EXCANGE_NAME, "key2", null, "路由模式的消息".getBytes()); 22 channel.close(); 23 connection.close(); 24 } 25 }
1 package com.edu.route; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 /** 9 * @ClassName Recver1 10 * @Deccription TODO 11 * @Author DZ 12 * @Date 2019/5/4 14:49 13 **/ 14 public class Recver1 { 15 //定义交换机 16 private final static String EXCHANGE_NAME = "testroute"; 17 private final static String QUEUE = "testroute1queue"; 18 19 public static void main(String[] args) throws Exception { 20 Connection connection = ConnectionUtil.getConnection(); 21 final Channel channel = connection.createChannel(); 22 channel.queueDeclare(QUEUE, false, false, false, null); 23 //绑定队列到交换机 24 //参数3:绑定到交换机指定的路由的名字 25 channel.queueBind(QUEUE, EXCHANGE_NAME, "key1"); 26 //如果需要绑定多个路由,再绑定一次即可 27 channel.queueBind(QUEUE, EXCHANGE_NAME, "key2"); 28 channel.basicQos(1); 29 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { 30 @Override 31 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 32 /* super.handleDelivery(consumerTag, envelope, properties, body);*/ 33 System.out.println("消费者1:" + new String(body)); 34 channel.basicAck(envelope.getDeliveryTag(), false); 35 } 36 }; 37 channel.basicConsume(QUEUE, false, defaultConsumer); 38 } 39 }
1 package com.edu.route; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 /** 9 * @ClassName Recver1 10 * @Deccription TODO 11 * @Author DZ 12 * @Date 2019/5/4 14:49 13 **/ 14 public class Recver2 { 15 //定义交换机 16 private final static String EXCHANGE_NAME = "testroute"; 17 private final static String QUEUE = "testroute2queue"; 18 19 public static void main(String[] args) throws Exception { 20 Connection connection = ConnectionUtil.getConnection(); 21 final Channel channel = connection.createChannel(); 22 channel.queueDeclare(QUEUE, false, false, false, null); 23 //绑定队列到交换机 24 //参数3:绑定到交换机指定的路由的名字 25 channel.queueBind(QUEUE, EXCHANGE_NAME, "key1"); 26 //如果需要绑定多个路由,再绑定一次即可 27 channel.queueBind(QUEUE, EXCHANGE_NAME, "key3"); 28 channel.basicQos(1); 29 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { 30 @Override 31 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 32 /* super.handleDelivery(consumerTag, envelope, properties, body);*/ 33 System.out.println("消费者2:" + new String(body)); 34 channel.basicAck(envelope.getDeliveryTag(), false); 35 } 36 }; 37 channel.basicConsume(QUEUE, false, defaultConsumer); 38 } 39 }
应用场景:利用消费者能够有选择性的接收消息的特性,比如我们商城系统的后台管理系统对于商品进行修改、删除、新增操作都需要更新前台系统的界面展示,而查询操作确不需要,那么这两个队列分开接收消息就比较好。
4.5.Topic
1 package com.edu.topic; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 7 /** 8 * @ClassName Sender 9 * @Deccription TODO 10 * @Author DZ 11 * @Date 2019/5/4 15:19 12 **/ 13 public class Sender { 14 private final static String EXCANGE_NAME = "testtopexchange"; 15 16 public static void main(String[] args) throws Exception { 17 Connection connection = ConnectionUtil.getConnection(); 18 Channel channel = connection.createChannel(); 19 channel.exchangeDeclare(EXCANGE_NAME, "topic"); 20 channel.basicPublish(EXCANGE_NAME, "abc.adb.1", null, "topic模式消息发送者:".getBytes()); 21 channel.close(); 22 connection.close(); 23 } 24 }
1 package com.edu.topic; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 /** 9 * @ClassName Recver1 10 * @Deccription TODO 11 * @Author DZ 12 * @Date 2019/5/4 14:49 13 **/ 14 public class Recver1 { 15 //定义交换机 16 private final static String EXCHANGE_NAME = "testtopexchange"; 17 private final static String QUEUE = "testtopic1queue"; 18 19 public static void main(String[] args) throws Exception { 20 Connection connection = ConnectionUtil.getConnection(); 21 final Channel channel = connection.createChannel(); 22 channel.queueDeclare(QUEUE, false, false, false, null); 23 //绑定队列到交换机 24 //参数3:绑定到交换机指定的路由的名字 25 channel.queueBind(QUEUE, EXCHANGE_NAME, "key.*"); 26 //如果需要绑定多个路由,再绑定一次即可 27 channel.queueBind(QUEUE, EXCHANGE_NAME, "abc.*"); 28 channel.basicQos(1); 29 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { 30 @Override 31 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 32 /* super.handleDelivery(consumerTag, envelope, properties, body);*/ 33 System.out.println("消费者1:" + new String(body)); 34 channel.basicAck(envelope.getDeliveryTag(), false); 35 } 36 }; 37 channel.basicConsume(QUEUE, false, defaultConsumer); 38 } 39 }
1 package com.edu.topic; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 /** 9 * @ClassName Recver1 10 * @Deccription TODO 11 * @Author DZ 12 * @Date 2019/5/4 14:49 13 **/ 14 public class Recver2 { 15 //定义交换机 16 private final static String EXCHANGE_NAME = "testtopexchange"; 17 private final static String QUEUE = "testtopic2queue"; 18 19 public static void main(String[] args) throws Exception { 20 Connection connection = ConnectionUtil.getConnection(); 21 final Channel channel = connection.createChannel(); 22 channel.queueDeclare(QUEUE, false, false, false, null); 23 //绑定队列到交换机 24 //参数3:绑定到交换机指定的路由的名字 25 channel.queueBind(QUEUE, EXCHANGE_NAME, "key.*"); 26 //如果需要绑定多个路由,再绑定一次即可 27 channel.queueBind(QUEUE, EXCHANGE_NAME, "abc.#"); 28 channel.basicQos(1); 29 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { 30 @Override 31 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 32 /* super.handleDelivery(consumerTag, envelope, properties, body);*/ 33 System.out.println("消费者2:" + new String(body)); 34 channel.basicAck(envelope.getDeliveryTag(), false); 35 } 36 }; 37 channel.basicConsume(QUEUE, false, defaultConsumer); 38 } 39 }
第六种模式是将上述的模式集成其它的框架,进行远程访问,这里我们将集成 Spring 实现 RCP 远程模式的使用
5.Spring 集成 RabbitMQ
5.1.自动集成 Spring
编写spring的配置,此配置文件的目的是将 Spring 与 RabbitMQ 进行整合,实际上就是将 MQ 的相关信息(连接,队列,交换机……)通过XML配置的方式实现
1 <beans xmlns="http://www.springframework.org/schema/beans" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xmlns:rabbit="http://www.springframework.org/schema/rabbit" 4 xsi:schemaLocation="http://www.springframework.org/schema/rabbit 5 http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd 6 http://www.springframework.org/schema/beans 7 http://www.springframework.org/schema/beans/spring-beans-4.3.xsd"> 8 <!--定义连接工厂--> 9 <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="test" password="test" 10 virtual-host="/test"/> 11 <!-- 12 定义模板 13 第三个参数,决定消息发送到哪里,如果为exchange,则发送到交换机;如果为queue,则发送到队列 14 --> 15 <rabbit:template id="template" connection-factory="connectionFactory" exchange="fanoutExchange"/> 16 <rabbit:admin connection-factory="connectionFactory"/> 17 <!--定义队列--> 18 <rabbit:queue name="myQueue" auto-declare="true"/> 19 <!--定义交换机--> 20 <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true"> 21 <!--将消息绑定到交换机--> 22 <rabbit:bindings> 23 <rabbit:binding queue="myQueue"> 24 25 </rabbit:binding> 26 </rabbit:bindings> 27 </rabbit:fanout-exchange> 28 <!--定义监听器,收到消息会执行--> 29 <rabbit:listener-container connection-factory="connectionFactory"> 30 <!-- 定义监听的类和方法--> 31 <rabbit:listener ref="consumer" method="test" queue-names="myQueue"/> 32 </rabbit:listener-container> 33 <!--定义消费者--> 34 <bean id="consumer" class="com.edu.spring.MyConsumer"/> 35 36 </beans>
生产者:
1 package com.edu.spring; 2 3 import org.springframework.amqp.rabbit.core.RabbitTemplate; 4 import org.springframework.context.ApplicationContext; 5 import org.springframework.context.support.ClassPathXmlApplicationContext; 6 7 /** 8 * @ClassName SpringTest 9 * @Deccription TODO 10 * @Author DZ 11 * @Date 2019/5/4 18:40 12 **/ 13 public class SpringTest { 14 public static void main(String[] args) throws Exception { 15 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext.xml"); 16 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); 17 rabbitTemplate.convertAndSend("Spring的消息"); 18 ((ClassPathXmlApplicationContext) applicationContext).destroy(); 19 } 20 }
消费者
1 package com.edu.spring; 2 3 /** 4 * @ClassName MyConsumer 5 * @Deccription TODO 6 * @Author DZ 7 * @Date 2019/5/4 18:35 8 **/ 9 public class MyConsumer { 10 /*用于接收消息*/ 11 public void test(String message) { 12 System.err.println(message); 13 } 14 }
集成Spring主要是在xml中实现了队列和交换机的创建。
5.2.手动模式
手动模式,主要增加MQ的回调操作,MQ消息失败或者成功就有相应的回调信息,增强系统的健壮性,一旦产生异常,很快就能定位到异常的位置,所以在实际开发中,一般都这种方式
创建xml配置文件
1 <beans xmlns="http://www.springframework.org/schema/beans" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xmlns:rabbit="http://www.springframework.org/schema/rabbit" 4 xmlns:context="http://www.springframework.org/schema/context" 5 xsi:schemaLocation="http://www.springframework.org/schema/rabbit 6 http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd 7 http://www.springframework.org/schema/beans 8 http://www.springframework.org/schema/beans/spring-beans-4.3.xsd 9 http://www.springframework.org/schema/context 10 http://www.springframework.org/schema/context/spring-context-4.3.xsd"> 11 <context:component-scan base-package="com.edu.spring2"/> 12 <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/> 13 14 <!-- 15 定义连接工厂 16 publisher-confirms为ture,确认失败等回调才会执行 17 --> 18 <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="test" password="test" 19 virtual-host="/test" publisher-confirms="true"/> 20 21 <rabbit:admin connection-factory="connectionFactory"/> 22 <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" confirm-callback="confirmCallBackListener" 23 return-callback="returnCallBackListener" 24 mandatory="true"/> 25 <!--定义队列--> 26 <rabbit:queue name="myQueue" auto-declare="true"/> 27 <!--定义交换机--> 28 <rabbit:direct-exchange name="DIRECT_EX" id="DIRECT_EX"> 29 <!--将消息绑定到交换机--> 30 <rabbit:bindings> 31 <rabbit:binding queue="myQueue"> 32 33 </rabbit:binding> 34 </rabbit:bindings> 35 </rabbit:direct-exchange> 36 <!--定义监听器,收到消息会执行--> 37 <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"> 38 <!-- 定义监听的类和方法--> 39 <rabbit:listener queues="myQueue" ref="receiveConfirmTestListener"/> 40 </rabbit:listener-container> 41 42 </beans>
创建回调监听函数
1 package com.edu.spring2; 2 3 import org.springframework.amqp.rabbit.core.RabbitTemplate; 4 import org.springframework.amqp.rabbit.support.CorrelationData; 5 import org.springframework.stereotype.Component; 6 7 /** 8 * @ClassName ConfirmCallBackListener 9 * @Deccription TODO 10 * @Author DZ 11 * @Date 2019/5/4 22:26 12 **/ 13 @Component("confirmCallBackListener") 14 public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback { 15 16 @Override 17 public void confirm(CorrelationData correlationData, boolean ack, String cause) { 18 System.out.println("确认回调 ack==" + ack + "回调原因==" + cause); 19 } 20 }
1 package com.edu.spring2; 2 3 import com.rabbitmq.client.Channel; 4 import org.springframework.amqp.core.Message; 5 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; 6 import org.springframework.stereotype.Component; 7 8 /** 9 * @ClassName ReceiveConfirmTestListener 10 * @Deccription TODO 11 * @Author DZ 12 * @Date 2019/5/4 22:24 13 **/ 14 @Component("receiveConfirmTestListener") 15 public class ReceiveConfirmTestListener implements ChannelAwareMessageListener { 16 /** 17 * 收到消息时,执行的监听 18 * 19 * @param message 20 * @param channel 21 * @throws Exception 22 */ 23 @Override 24 public void onMessage(Message message, Channel channel) throws Exception { 25 System.out.println(("消费者收到了消息" + message)); 26 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); 27 } 28 }
1 package com.edu.spring2; 2 3 import org.springframework.amqp.core.Message; 4 import org.springframework.amqp.rabbit.core.RabbitTemplate; 5 import org.springframework.stereotype.Component; 6 7 /** 8 * @ClassName ReturnCallBackListener 9 * @Deccription TODO 10 * @Author DZ 11 * @Date 2019/5/4 22:28 12 **/ 13 @Component("returnCallBackListener") 14 public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback { 15 @Override 16 public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { 17 System.out.println("失败回调" + message); 18 } 19 }
回调函数的配置来自 XML
1 package com.edu.spring2; 2 3 import org.springframework.amqp.core.AmqpTemplate; 4 import org.springframework.beans.factory.annotation.Autowired; 5 import org.springframework.stereotype.Component; 6 7 /** 8 * @ClassName PublicUtil 9 * @Deccription TODO 10 * @Author DZ 11 * @Date 2019/5/4 22:30 12 **/ 13 @Component("publicUtil") 14 public class PublicUtil { 15 @Autowired 16 private AmqpTemplate amqpTemplate; 17 18 public void send(String excange, String routingkey, Object message) { 19 amqpTemplate.convertAndSend(excange, routingkey, message); 20 } 21 }
创建测试类
1 package com.edu.spring2; 2 3 import org.junit.Test; 4 import org.junit.runner.RunWith; 5 import org.springframework.beans.factory.annotation.Autowired; 6 import org.springframework.test.context.ContextConfiguration; 7 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; 8 9 /** 10 * @ClassName TestMain 11 * @Deccription TODO 12 * @Author DZ 13 * @Date 2019/5/4 22:32 14 **/ 15 @RunWith(SpringJUnit4ClassRunner.class) 16 @ContextConfiguration(locations = {"classpath:applicationContext2.xml"}) 17 public class TestMain { 18 @Autowired 19 private PublicUtil publicUtil; 20 private static String exChange = "DIRECT_EX";//交换机 21 private static String queue = "myQueue"; 22 23 /** 24 * exChange和queue均正确 25 * confirm会执行,ack = ture 26 * 消息正常接收(接收消息确认方法正常执行) 27 */ 28 @Test 29 public void test1() throws Exception { 30 publicUtil.send(exChange, queue, "测试1,队列和交换机均正确"); 31 } 32 /** 33 * exChange错误,queue正确 34 * confirm执行,ack=false 35 * 消息无法接收(接收消息确认方法不能执行) 36 */ 37 @Test 38 public void test2() throws Exception { 39 publicUtil.send(exChange + "1", queue, "测试2,队列正确,交换机错误"); 40 } 41 /** 42 * exChange正常,queue错误 43 * return执行 44 * confirm执行,ack=ture 45 */ 46 @Test 47 public void test3() throws Exception { 48 publicUtil.send(exChange, queue + "1", "测试2,队列错误,交换机正确"); 49 } 50 /** 51 * exChange错误,queue错误 52 * confirm执行,ack=false 53 */ 54 @Test 55 public void test4() throws Exception { 56 publicUtil.send(exChange + "1", queue + "1", "测试2,队列错误,交换机错误"); 57 } 58 }
测试结果如下:
-
test1:exChange和queue均正确
- confirm会执行,ack=ture;能正常收到消息(接收消息的方法正常执行)
-
test2:exChange错误,queue正确
- test3:exChange正确,queue错误
- test4:exChange和queue均错误
上述结论及代码如下图:
根据上述的测试结果,我们可以根据回调函数的返回结果,查看MQ的错误出现在那里。根据上述结论,我们可以对3个回调函数做如下处理:
-
类 ReceiveConfirmTestListener 中的onMessage方法主要用于接收从 RabbitMQ 推送过来的消息,并对消息做相应的逻辑处理
-
类 ConfirmCallBackListener 中的 confirm 方法主要用于检查交换机(exChange),当 ack=false,交换机可能错误
-
类 ReturnCallBackListener 中的 returnedMessage 方法用于检查队列(queue),当此方法执行时,队列可能错误
实际上,在真实项目中,上面3个方法也是按照这3个逻辑进行设计的。当然这3个方法中还可以加入更多的日志消息,和逻辑处理业务。
6.参考
https://blog.csdn.net/liu911025/article/details/80460182
https://blog.csdn.net/lyhkmm/article/details/78775369
https://blog.csdn.net/vbirdbest/article/details/78670550
https://blog.csdn.net/vbirdbest/article/details/78670550
https://www.rabbitmq.com/getstarted.html