环境
win7
rabbitmq-server-3.7.17
Erlang 22.1
一、概念
1、队列
队列用于临时存储消息和转发消息。
队列类型有两种,即时队列和延时队列。
即时队列:队列中的消息会被立即消费;
延时队列:队列中的消息会在指定的时间延时之后被消费。
2、交换机
交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。
交换机有四种类型:Direct, topic, Headers and Fanout。
Direct[精确匹配类型]:Direct是RabbitMQ默认的交换机模式,先匹配, 再投送。即创建消息队列的时候,指定一个BindingKey.当发送者发送消息的时候,指定对应的Key.当Key和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中.
Topic[模式匹配]:按通配符匹配规则转发消息(最灵活),队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.
Headers[键值对匹配]:设置header attribute参数类型的交换机。
消息队列和交换机绑定的时候会指定一组键值对规则,而发送消息的时候也会指定一组键值对规则,当两组键值对规则相匹配的时候,消息会被发送到匹配的消息队列中.
Fanout[转发消息最快]:
路由广播的形式,简单的将队列绑定到交换机上将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略.
3、使用spring boot和rabbitmq整合 搭建演示工程
二、Direct Exchange-Work模式
配置类:
package com.wjy.direct; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //@Configuration这个注解是必须的,保证在基本类实例化之前该类已经被实例化 @Configuration public class RabbitConfig { /** * @Desc: 配置一个消息队列(routingKey=q_hello) */ @Bean public Queue queue() { return new Queue("q_hello"); } /** * @Desc: 配置一个消息队列(routingKey=notify.refund) */ @Bean public Queue refundNotifyQueue() { return new Queue("notify.refund"); } /** * @Desc: 配置一个消息队列(routingKey=query.order) 测试RPC */ @Bean public Queue queryOrderQueue() { return new Queue("query.order"); } }
生产者:
package com.wjy.direct; import com.wjy.mojo.Order; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; /** * @Desc: 生产者 */ @Component public class MqSender { @Autowired private AmqpTemplate rabbitTemplate; /** * @Desc: 将消息发送至默认的交换机且routingKey为q_hello */ public void send() { //24小时制 String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); String context = "hello " + date; System.err.println("Sender : " + context); //简单对列的情况下routingKey即为Q名 this.rabbitTemplate.convertAndSend("q_hello", context); } /** * @Desc: 将消息发送至默认的交换机且routingKey为q_hello */ public void send(String i) { //24小时制 String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); String context = "hello " + i + " " + date; System.err.println("Sender : " + context); //简单对列的情况下routingKey即为Q名 this.rabbitTemplate.convertAndSend("q_hello", context); } /** * @Desc: 将发送对象 */ public void sender(Order order){ System.err.println("notify.refund send message: "+order); rabbitTemplate.convertAndSend("notify.refund", order); } /** * @Desc: 测试RPC */ public void sender(String orderId){ System.err.println("query.order send message: "+orderId); Order order = (Order) rabbitTemplate.convertSendAndReceive("query.order", orderId); System.err.println("query.order return message: "+order); } }