RabbitMq 的消息交换器有三种常用的交换模式,分别是
- direct 订阅模式,发布与订阅,完全匹配
- fanout 广播模式
- topic 主题,规则匹配
今天这篇文章介绍 rabbitmq 如何基于 direct 模式进行发送消息
这里我们准备两个项目。
- 消息生产项目(rabbit-provider)
- 消息接收项目(rabbit-consumer)
模拟项目中统一处理日志的效果,消息生产项目发送 info 和 error 级别的日志
消息接收项目对接收到的消息进行处理。
介绍一下流程:
消息生产者生成消息,绑定 交换器 和 路由键 进行发送,消息接收者通过 绑定 队列,交换器,路由键 进行消息接收。
此处,rabbit 先 通过确实同一个 交换器的 消息,在进行确定 路由键,来绑定 消息的生产者和消息的消费者。
1.消息接收项目(rabbit-consumer)
1.yml配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: zhangzq
password: ******
# 1.交换机名称
# 2.队列名称
# 路由键
# 设置交换机的名称
mq:
config:
exchange: log.direct
queue:
# info 队列名称
info: log.info
# info 队列的路由键
info.routing.key: log.info.routing.key
# error 队列名称
error: log.error
# error 队列的路由键
error.routing.key: log.error.routing.key
2.pom.xml配置
和https://blog.csdn.net/yali_aini/article/details/87544229 这篇文章上的 配置一样,只需要添加 rabbitmq 的依赖即可
3.消息接受类编写
InfoConsumer
使用 @RabbitListener 进行队列绑定,具体使用 @QueueBinding 的 value(队列),exchange(交换器) , key(路由键)进行队列绑定
具体代码如***意看注释
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.context.annotation.Configuration;
/**
* 消息接收者,消费者
*/
@Configuration
// 消息监听
@RabbitListener(
// 绑定队列
bindings = {
// 绑定队列的类
@QueueBinding(
// 1.队列名称 2.自动删除,配不配无所谓
value = @Queue(value = "${mq.config.queue.info}",autoDelete = "true"),
// 消息交换器 1.交换器名称,2.交换器类型
exchange = @Exchange(value = "${mq.config.exchange}" , type = ExchangeTypes.DIRECT),
// 消息路由键
key = "${mq.config.queue.info.routing.key}"
)
}
)
public class InfoConsumer {
/**
* 接收消息的处理方法,带上 @RabbitHandler 表示该队列监听到的消息由该方法处理
* @param msg
*/
@RabbitHandler
public void infoReceiver(String msg){
System.out.println( "【INFO】消息接收成功,消息是:【"+ msg +"】" );
}
}
ErrorConsumer
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.context.annotation.Configuration;
/**
* 消息接收类
*/
@Configuration
@RabbitListener(
// 队列绑定
bindings = {
@QueueBinding(
// 绑定队列 1.队列名称,2队列是否自动删除
value = @Queue(value = "${mq.config.queue.error}",autoDelete = "true"),
// 绑定交换器 1.交换器名称,2交换器类型
exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.DIRECT),
// 绑定 路由键
key = "${mq.config.queue.error.routing.key}"
)
}
)
public class ErrorConsumer {
/**
* 接收消息的处理方法,带上 @RabbitHandler 表示该队列监听到的消息由该方法处理
* @param msg
*/
@RabbitHandler
public void errorReceiver(String msg){
System.err.println( "【ERROR】消息接收成功,消息是:【"+ msg +"】" );
}
}
2.消息生产项目(rabbit-provider)
1.yml配置
此处的 交换器,和队列名称,和路由键要和 消息消费项目的 配置相对应,消息消费项目就是依靠 交换器 和路由键来绑定消息的。
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: zhangzq
password: ******
mq:
config:
# 交换器名称
exchange: log.direct
# 队列名称
queue:
# info 队列名称
#info: log.info
# info 队列路由键名称
info.routing.key: log.info.routing.key
# error 队列名称
#error: log.error
# error 队列路由键名称
error.routing.key: log.error.routing.key
2.pom.xml配置
和 消息消费者的一样
3.消息生产类
生产类相对于消费类来说比较简单,和上篇文章的 rabbit入门操作类似。就是发送消息的时候 使用三个参数的,通过 交换器,路由键,消息, 三个参数发送。
ErrorSender
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ErrorSender {
private final static Logger log = LoggerFactory.getLogger(ErrorSender.class);
@Autowired
private AmqpTemplate amqpTemplate;
// 交换器名称
@Value("${mq.config.exchange}")
private String exchange;
// 路由键名称
@Value("${mq.config.queue.error.routing.key}")
private String routingkey;
// 发送消息的方法
public void send(String msg){
// 三个参数
/**
* 1.交换器的名称
* 2.路由键的名称
* 3.消息
*/
this.amqpTemplate.convertAndSend(this.exchange, this.routingkey, msg);
log.info("【ERROR】消息发送成功!exchange:【{}】,routingkey:【{}】",this.exchange,this.routingkey);
}
}
InfoSender
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
public class InfoSender {
private final static Logger log = LoggerFactory.getLogger(InfoSender.class);
@Autowired
private AmqpTemplate amqpTemplate;
// 交换器名称
@Value("${mq.config.exchange}")
private String exchange;
// 路由键名称
@Value("${mq.config.queue.info.routing.key}")
private String routingkey;
// 发送消息的方法
public void send(String msg){
// 三个参数
/**
* 1.交换器的名称
* 2.路由键的名称
* 3.消息
*/
this.amqpTemplate.convertAndSend(this.exchange, this.routingkey, msg);
log.info("【INFO】消息发送成功!exchange:【{}】,routingkey:【{}】",this.exchange,this.routingkey);
}
}
3.效果
我这里先启动,消息接收项目,然后启动项目发送项目,在消息发送的项目里面每个 10秒钟 进行消息发送。然后消息接收项目接收到消息后进行输出。
@RequestMapping("/rabbit")
public String send(){
while(true){
try{
Thread.sleep(1000*10);
}catch (Exception e){
e.printStackTrace();
}
this.errorSender.send(new Date()+"<===>"+Math.random()*10000 +"" );
this.infoSender.send(new Date()+"<===>"+Math.random()*10000 +"" );
}
}