与springboot整合
Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。
1、pom.xml 添加AMQP的启动器:(我已配置)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、在application.yml中添加RabbitMQ地址:
修改为自己的
spring:
rabbitmq:
host: 192.168.56.101
username: leyou
password: leyou
virtual-host: /leyou
3、监听者 Listener 类
需和application 同级报以下,不然无法扫描到 @Component 注解
@RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:
bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性:
value:这个消费者关联的队列。值是@Queue,代表一个队列
exchange:队列所绑定的交换机,值是@Exchange类型
key:队列和交换机绑定的RoutingKey
类似listen这样的方法在一个类中可以写多个,就代表多个消费者。
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 监听者
*
* @RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性: bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性:
* value:这个消费者关联的队列。值是@Queue,代表一个队列
* exchange:队列所绑定的交换机,值是@Exchange类型
* key: 队列和交换机绑定的RoutingKey
* 类似listen这样的方法在一个类中可以写多个,就代表多个消费者。
**/
@Component
public class Listener {
//接收消息方法,可以指定多个bindings,类中可以存在多个 listen方法,代表多个消费者
@RabbitListener(bindings = @QueueBinding(value = @Queue(
// bindings下的value : 队列
value = "spring.test.queue", durable = "true"), exchange = @Exchange(
// exchange :交换机
value = "spring.test.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC),
// key:队列和交换机绑定的RoutingKey
key = {"#.#"}))
public void listen(String msg) {
System.out.println("接收到消息:" + msg);
}
}
/*
* 测试,在test 测试目录里面,启动项目查看日志
* src\test\java\com\ws\xdy\springbootrabbitmqdemo\SpringBootRabbitmqDemoApplicationTests.java
* */
4、测试
Spring为AMQP提供了统一的消息处理模板:AmqpTemplate,非常方便的发送消息,其发送方法:
Controller ,service,dao层 都可以任意调用
@Autowired
private AmqpTemplate amqpTemplate;
在测试目录下建立测试类
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(value = SpringRunner.class)
@SpringBootTest
public class MqDemoTest {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void testSend() throws InterruptedException {
String msg = "hello, Spring boot amqp";
/*
* 1个参数 :指定消息
* 2个参数 :指定Routingkey、和消息(发送到默认队列)
* 3个参数 :指定交换机、Routingkey、和消息
* */
this.amqpTemplate.convertAndSend("spring.test.exchange","a.b", msg);
// 等待10秒后再结束
Thread.sleep(10000);
}
}
运行测试方法,获得输出消息
个人项目demo :https://pan.baidu.com/s/10UjZTu6yiDU8EiXDeZT8WQ 提取码:dt9w
1、各模式代码处
2、Springboot 整合代码放置处