springforall

SpringBoot RabbitMQ 整合使用

![](http://ww2.sinaimg.cn/large/006tNc79ly1g5jjb62t88j30u00gwdi2.jpg) ### 前提 上次写了篇文章,[《SpringBoot Kafka 整合使用》](https://mp.weixin.qq.com/s?__biz=MzU0MDEwMjgwNA==&mid=2247485807&idx=1&sn=846306cd5115e3bd16dd2c999db84f16&chksm=fb3f1094cc489982f3857fc2116723bb916f6790ba3f407d4fa5282dab4252d9129c8a3098f9&token=2068610699&lang=zh_CN#rd),阅读量还挺高的,于是想想还是把其他几种 MQ 也和 SpringBoot 整合使用下。 下面是四种比较流行的 MQ : ![](http://ww3.sinaimg.cn/large/006tNc79ly1g5jjbhev27j30fw072glz.jpg) 后面都写写和 SpringBoot 整合的文章。 ### 安装 RabbitMQ 由于换 Mac 了,所以一些环境就直接在 Mac 搞,但是像安装 RabbitMQ 这些又会把自己电脑系统给搞的太乱,所以能在 Docker 里面安装就安装在 Docker,这次 RabbitMQ 我也直接在 Docker 里安装。 启动 Docker for Mac,如果没安装过的请看我上一篇文章:http://www.54tianzhisheng.cn/2018/01/25/Docker-install/ 当然你也可以在自己的 Linux 服务器或者虚拟机里启动安装 RabbitMQ 。 Docker 安装的话很简单,因为 RabbitMQ 官方已经提供了自己的 Docker 容器,只需要一行命令:(可右移查看完整代码) ``` docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq rabbitmq:3-management ``` 该镜像拥有一个基于 web 的控制台和 Http API。Http API 可以在地址看到如何使用:`http://localhost:15672/api/` 讲解下上面命令行: - 15672 :表示 RabbitMQ 控制台端口号,可以在浏览器中通过控制台来执行 RabbitMQ 的相关操作。 - 5672 : 表示 RabbitMQ 所监听的 TCP 端口号,应用程序可通过该端口与 RabbitMQ 建立 TCP 连接,并完成后续的异步消息通信 - RABBITMQDEFAULTUSER:用于设置登陆控制台的用户名,这里我设置 admin - RABBITMQDEFAULTPASS:用于设置登陆控制台的密码,这里我设置 admin 容器启动成功后,可以在浏览器输入地址:http://localhost:15672/ 访问控制台 ![](http://ww1.sinaimg.cn/large/006tNc79ly1g5jjbro4evj30js098jrv.jpg) 登陆后: ![](http://ww4.sinaimg.cn/large/006tNc79ly1g5jjc4moi5j30u00grabn.jpg) 简单描述下上图中中控制台的列表的作用: - Overview :用于查看 RabbitMQ 的一些基本信息(消息队列、消息发送速率、节点、端口和上下文信息等) - Connections:用于查看 RabbitMQ 客户端的连接信息 - Channels:用户查看 RabbitMQ 的通道信息 - Exchange:用于查看 RabbitMQ 交换机 - Queues:用于查看 RabbitMQ 的队列 - Admin:用于管理用户,可增加用户 ### 创建项目 在 IDEA 中创建一个 SpringBoot 项目结构: ![](http://ww2.sinaimg.cn/large/006tNc79ly1g5jjchb62cj30pk12qacv.jpg) SpringBoot 框架中已经内置了对 RabbitMQ 的支持,如果你看过官方文档的话,就可以看到的,我们需要把依赖 spring-boot-starter-amqp 引入就行。 1、 **pom.xml** 引入依赖后如下: ``` 4.0.0com.zhishengrabbitmq0.0.1-SNAPSHOTjarrabbitmqDemo project for Spring Boot RabbitMQorg.springframework.bootspring-boot-starter-parent1.5.9.RELEASEUTF-8UTF-81.8org.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-testtestorg.springframework.bootspring-boot-starter-amqporg.springframework.bootspring-boot-maven-plugin ``` 2、**application.properties** 配置修改如下: ``` spring.rabbitmq.addresses=localhost:5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin ``` 3、**消息发送类** RabbitMQClient.java ``` package com.zhisheng.rabbitmq.client; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * Created by zhisheng_tian on 2018/1/23 */ @Componentpublic class RabbitMQClient { @Autowired private RabbitTemplate rabbitTemplate; public void send(String message) { rabbitTemplate.convertAndSend("zhisheng", message); } } ``` 就这样,发送消息代码就实现了。 这里关键的代码为 rabbitTemplate.convertAndSend() 方法, `zhisheng` 这个是路由规则(routingKey),它的值表明将消息发送到指定的队列 `zhisheng` 中去,这里跟了下源码,发现 convertAndSend() 方法最后调用的方法其实是一个 doSend() 方法。 ![](http://ww4.sinaimg.cn/large/006tNc79ly1g5jjctyd9xj30u00h6go5.jpg) 4、**消息接收类** ``` package com.zhisheng.rabbitmq.server; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * Created by zhisheng_tian on 2018/1/23 */ @Componentpublic class RabbitMQServer { @RabbitListener(queues = "zhisheng") public void receive(String message) { System.out.println("收到的 message 是:" + message); } } ``` 你看,这里就有个 `RabbitListener` 一直在监听着队列 `zhisheng` 。 当然这个队列是必须要我们自己在应用程序中创建好,它不会像我之前写的文章 [《SpringBoot Kafka 整合使用》](https://mp.weixin.qq.com/s?__biz=MzU0MDEwMjgwNA==&mid=2247485807&idx=1&sn=846306cd5115e3bd16dd2c999db84f16&chksm=fb3f1094cc489982f3857fc2116723bb916f6790ba3f407d4fa5282dab4252d9129c8a3098f9&token=2068610699&lang=zh_CN#rd) 中的 Kafka 一样,Kafka 它会在用到队列的时候动态的创建,不需要我们提前创建好。 那么在 RabbitMQ 中该如何创建队列呢? ![](http://ww1.sinaimg.cn/large/006tNc79ly1g5jjd3nu76j30u00eldhz.jpg) 如上图所示:这样我们就创建好了一个 `zhisheng` 的队列,当程序开始运行时,消息接收类会持续监听队列 `zhisheng` 中即将到来的消息。 5、**运行项目** 需要在启动类中注入发送消息的类,并且提供 init 方法,在 init 方法中调用发送消息类的 send() 方法 ``` @PostConstructpublic void init() { rabbitMQClient.send("发送消息----zhisheng-----"); } ``` 需要注意的是:init() 方法带有 @PostConstruct 注解,被 @PostConstruct 修饰的方法会在构造函数之后执行。 启动项目就可以发现控制台已经接收到消息了。 ![](http://ww4.sinaimg.cn/large/006tNc79ly1g5jjdg4l3mj30u00jddk5.jpg) 6、**单线程测试性能** 看到上面图片中注释掉的代码没?那就是用来测试消息发送的性能的,我发送 10000 条消息看看总共耗时多少。 ![](http://ww3.sinaimg.cn/large/006tNc79ly1g5jjdq28paj30u00fh40o.jpg) 10000 条消息发送耗时:215ms。这是在单线程下,下次可以和其他的 MQ 测试对比下,并且也可以在多线程的环境下测试性能。 同时从控制台可以看到发送的速率: ![](http://ww2.sinaimg.cn/large/006tNc79ly1g5jjdyxk9fj30u00esn10.jpg) 7、**多线程测试性能** 开了10 个线程,每个线程发送 10000 条消息。 init 方法代码如下: ``` @PostConstruct public void init() { StopWatch stopWatch = new StopWatch(); stopWatch.start(); int threads = 10; ExecutorService executorService = Executors.newFixedThreadPool(threads); final CountDownLatch start = new CountDownLatch(1); final CountDownLatch end = new CountDownLatch(threads); for (int i = 0; i { try { start.await(); for (int i1 = 0; i1

posted on 2019-08-01 01:39 SpringForAll 阅读(...) 评论(...) 编辑 收藏

分类:

技术点:

相关文章: