本文,讲解 Spring Boot 如何集成 RabbitMQ,实现消息队列。

什么是 RabitMQ

RabbitMQ 是一个在 AMQP 基础上完整的,可复用的企业消息系统。

关于 RabbitMQ 的使用,可以阅读之前的 RabbitMQ 实战教程。

Spring Boot 整合 RabbitMQ

Spring Boot 整合 RabbitMQ 是非常容易,只需要两个步骤。

首先,在 pom.xml 中增加 RabbitMQ 依赖。

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

第二步,在 src/main/resources/application.properties 中配置信息。

  1. #rabbitmq
  2. spring.rabbitmq.host=localhost
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=guest
  5. spring.rabbitmq.password=guest

实战演练

一个简单的实战开始

我们来实现一个简单的发送、接收消息。

Configuration

在 Spring Boot 中使用 @Bean 注册一个队列。

  1. @Configuration
  2. public class RabbitMQConfig {
  3. public static final String QUEUE_NAME = "spring-boot-simple";
  4. @Bean
  5. public Queue queue() {
  6. return new Queue(QUEUE_NAME);
  7. }
  8. }

消息生产者

创建消息生产者 Sender。通过注入 AmqpTemplate 接口的实例来实现消息的发送。

  1. @Service
  2. public class Sender {
  3. @Autowired
  4. private AmqpTemplate rabbitTemplate;
  5. public void send() {
  6. System.out.println("梁桂钊 发送消息...");
  7. rabbitTemplate.convertAndSend(RabbitMQConfig.QUEUE_NAME, "你好, 梁桂钊!");
  8. }
  9. }

消息消费者

创建消息消费者 Receiver。通过 @RabbitListener 注解定义对队列的监听。

  1. @Service
  2. public class Receiver {
  3. @Autowired
  4. private AmqpTemplate rabbitTemplate;
  5. @RabbitListener(queues = "spring-boot-simple")
  6. public void receiveMessage(String message) {
  7. System.out.println("Received <" + message + ">");
  8. }
  9. }

运行

  1. @SpringBootApplication
  2. @EnableAutoConfiguration
  3. @ComponentScan(basePackages = { "com.lianggzone.springboot" })
  4. public class RunMain {
  5. public static void main(String[] args) {
  6. SpringApplication.run(RunMain.class, args);
  7. }
  8. }

单元测试

创建单元测试用例

  1. public class RabbitMQTest {
  2. @Autowired
  3. private Sender sender;
  4. @Test
  5. public void send() throws Exception {
  6. sender.send();
  7. }
  8. }

路由的实战演练

经过上面的实战案例,我们对 Spring Boot 整合 RabbitMQ 有了一定的了解。现在,我们再来看下 RabbitMQ 路由场景。

Configuration

在 RabbitMQConfig 中,我们注册 队列,转发器,监听等。

  1. @Configuration
  2. public class RabbitMQConfig2 {
  3.  
  4. public static final String QUEUE_NAME = "spring-boot";
  5. public static final String QUEUE_EXCHANGE_NAME = "spring-boot-exchange";
  6.  
  7. @Bean
  8. public Queue queue() {
  9. // 是否持久化
  10. boolean durable = true;
  11. // 仅创建者可以使用的私有队列,断开后自动删除
  12. boolean exclusive = false;
  13. // 当所有消费客户端连接断开后,是否自动删除队列
  14. boolean autoDelete = false;
  15. return new Queue(QUEUE_NAME, durable, exclusive, autoDelete);
  16. }
  17.  
  18. @Bean
  19. public TopicExchange exchange() {
  20. // 是否持久化
  21. boolean durable = true;
  22. // 当所有消费客户端连接断开后,是否自动删除队列
  23. boolean autoDelete = false;
  24. return new TopicExchange(QUEUE_EXCHANGE_NAME, durable, autoDelete);
  25. }
  26.  
  27. @Bean
  28. public Binding binding(Queue queue, TopicExchange exchange) {
  29. return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME);
  30. }
  31.  
  32. @Bean
  33. SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
  34. MessageListenerAdapter listenerAdapter) {
  35. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
  36. container.setConnectionFactory(connectionFactory);
  37. container.setQueueNames(QUEUE_NAME);
  38. container.setMessageListener(listenerAdapter);
  39. return container;
  40. }
  41.  
  42. @Bean
  43. MessageListenerAdapter listenerAdapter(Receiver receiver) {
  44. return new MessageListenerAdapter(receiver, "receiveMessage");
  45. }
  46. }

消息生产者

创建消息生产者 Sender。通过注入 AmqpTemplate 接口的实例来实现消息的发送。

  1. @Service
  2. public class Sender {
  3.  
  4. @Autowired
  5. private AmqpTemplate rabbitTemplate;
  6.  
  7. public void send() {
  8. System.out.println("梁桂钊 发送消息...");
  9. rabbitTemplate.convertAndSend(RabbitMQConfig2.QUEUE_NAME, "你好, 梁桂钊!");
  10. }
  11. }

消息消费者

创建消息消费者 Receiver。通过 @RabbitListener 注解定义对队列的监听。

  1. @Service
  2. public class Receiver {
  3.  
  4. public void receiveMessage(String message) {
  5. System.out.println("Received <" + message + ">");
  6. }
  7. }

运行

  1. @SpringBootApplication
  2. @EnableAutoConfiguration
  3. @ComponentScan(basePackages = { "com.lianggzone.springboot" })
  4. public class RunMain {
  5. public static void main(String[] args) {
  6. SpringApplication.run(RunMain.class, args);
  7. }
  8. }

单元测试

创建单元测试用例

  1. public class RabbitMQTest {
  2. @Autowired
  3. private Sender sender;
  4. @Test
  5. public void send() throws Exception {
  6. sender.send();
  7. }
  8. }

源代码

相关示例完整代码: springboot-action

(完)

 

Spring Boot 揭秘与实战(六) 消息队列篇 - RabbitMQ

相关文章: