MQ的作用:
- 解耦:在项目启动之初是很难预测未来会遇到什么困难的,消息中间件在处理过程中插入了一个隐含的,基于数据的接口层,两边都实现这个接口,这样就允许独立的修改或者扩展两边的处理过程,只要两边遵守相同的接口约束即可。
- 冗余(存储):在某些情况下处理数据的过程中会失败,消息中间件允许把数据持久化知道他们完全被处理
- 扩展性:消息中间件解耦了应用的过程,所以提供消息入队和处理的效率是很容易的,只需要增加处理流程就可以了。
- 削峰:在访问量剧增的情况下,但是应用仍然需要发挥作用,但是这样的突发流量并不常见。而使用消息中间件采用队列的形式可以减少突发访问压力,不会因为突发的超时负荷要求而崩溃
- 可恢复性:当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合性,当一个处理消息的进程挂掉后,加入消息中间件的消息仍然可以在系统恢复后重新处理
- 顺序保证:在大多数场景下,处理数据的顺序也很重要,大部分消息中间件支持一定的顺序性
- 缓冲:消息中间件通过一个缓冲层来帮助任务最高效率的执行
- 异步通信:通过把把消息发送给消息中间件,消息中间件并不立即处理它,后续在慢慢处理。
RabbitMQ:
它是采用Erlang语言实现的AMQP(Advanced Message Queued Protocol)的消息中间件,最初起源于金融系统,用在分布式系统存储转发消息。
RabbitMQ发展到今天,被越来越多的人认可,这和它在易用性、扩展性、可靠性和高可用性等方面的卓著表现是分不开的。RabbitMQ的具体特点可以概括为以下几点:
- 可靠性:RabbitMQ使用一些机制来保证可靠性,如持久化、传输确认及发布确认等。
- 灵活的路由:在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能,RabbitMQ己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。
- 扩展性:多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
- 高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队仍然可用。
- 多种协议:RabbitMQ除了原生支持AMQP协议,还支持STOMP,MQTT等多种消息中间件协议。
- 多语言客户端:RabbitMQ几乎支持所有常用语言,比如Jav a、Python、Ruby、PHP、C#、JavaScript等。
- 管理界面:RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。
- 插件机制:RabbitMQ提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件。
参考原文:https://blog.csdn.net/weixin_40792878/article/details/82555791
系统架构
Rabbitmq系统最核心的组件是Exchange和Queue,下图是系统简单的示意图。Exchange和Queue是在rabbitmq server(又叫做broker)端,producer和consumer在应用端。
producer&Consumer
producer指的是消息生产者,consumer消息的消费者。
Queue
消息队列,提供了FIFO的处理机制,具有缓存消息的能力。rabbitmq中,队列消息可以设置为持久化,临时或者自动删除。
- 设置为持久化的队列,queue中的消息会在server本地硬盘存储一份,防止系统crash,数据丢失
- 设置为临时队列,queue中的数据在系统重启之后就会丢失
- 设置为自动删除的队列,当不存在用户连接到server,队列中的数据会被自动删除
Exchange
Exchange类似于数据通信网络中的交换机,提供消息路由策略。rabbitmq中,producer不是通过信道直接将消息发送给queue,而是先发送给Exchange。一个Exchange可以和多个Queue进行绑定,producer在传递消息的时候,会传递一个ROUTING_KEY,Exchange会根据这个ROUTING_KEY按照特定的路由算法,将消息路由给指定的queue。和Queue一样,Exchange也可设置为持久化,临时或者自动删除。
Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别:
-
Direct
直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue
-
fanout
广播是式交换器,不管消息的ROUTING_KEY设置为什么,Exchange都会将消息转发给所有绑定的Queue。
-
topic
主题交换器,工作方式类似于组播,Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列。( * 表是匹配一个任意词组,#表示匹配0个或多个词组)
-
headers
消息体的header匹配(ignore)
Binding
所谓绑定就是将一个特定的 Exchange 和一个特定的 Queue 绑定起来。Exchange 和Queue的绑定可以是多对多的关系。
virtual host
在rabbitmq server上可以创建多个虚拟的message broker,又叫做virtual hosts (vhosts)。每一个vhost本质上是一个mini-rabbitmq server,分别管理各自的exchange,和bindings。vhost相当于物理的server,可以为不同app提供边界隔离,使得应用安全的运行在不同的vhost实例上,相互之间不会干扰。producer和consumer连接rabbit server需要指定一个vhost。
通信过程
假设P1和C1注册了相同的Broker,Exchange和Queue。P1发送的消息最终会被C1消费。基本的通信流程大概如下所示:
- P1生产消息,发送给服务器端的Exchange
- Exchange收到消息,根据ROUTINKEY,将消息转发给匹配的Queue1
- Queue1收到消息,将消息发送给订阅者C1
- C1收到消息,发送ACK给队列确认收到消息
- Queue1收到ACK,删除队列中缓存的此条消息
Consumer收到消息时需要显式的向rabbit broker发送basic.ack消息或者consumer订阅消息时设置auto_ack参数为true。在通信过程中,队列对ACK的处理有以下几种情况:
- 如果consumer接收了消息,发送ack,rabbitmq会删除队列中这个消息,发送另一条消息给consumer。
- 如果cosumer接受了消息,
但在发送ack之前断开连接,rabbitmq会认为这条消息没有被deliver,在consumer在次连接的时候,这条消息会被redeliver。 - 如果consumer接受了消息,但是程序中有bug,忘记了ack,rabbitmq不会重复发送消息。
- rabbitmq2.0.0和之后的版本支持consumer
reject某条(类)消息,可以通过设置requeue参数中的reject为true达到目地,那么rabbitmq将会把消息发送给下一个注册的consumer。
spring-boot-starter-amqp
高级消息队列协议(AMQP)是面向消息中间件的平台中立的有线协议。Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发。Spring Boot为通过RabbitMQ与AMQP一起工作提供了一些便利,包括spring-boot-starter-amqp “Starter”。
springboot集成RabbitMQ非常简单,如果只是简单的使用配置非常少,springboot提供了spring-boot-starter-amqp项目对消息各种支持。
添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
RabbitMQ是基于AMQP协议的轻量级,可靠,可扩展,可移植的消息代理。Spring使用RabbitMQ通过AMQP协议进行通信。
属性配置
RabbitMQ配置由外部配置属性控制 spring.rabbitmq.*。例如,您可以在以下部分声明以下部分 application.properties:
spring.rabbitmq.host = localhost
spring.rabbitmq.port = 5672
spring.rabbitmq.username = guest
spring.rabbitmq.password = guest
快速上手
1.队列配置
package com.example.rabbitmqdemo.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ 配置类
*
* @author itguang
**/
@Configuration
public class RabbitConfig {
@Bean
public Queue queue(){
return new Queue("hello");
}
}
2 发送者
rabbitTemplate是springboot 提供的默认实现.
package com.example.rabbitmqdemo.rabbitmq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;
/**
* 消息发送者
*
* @author itguang
* @create 2018-04-21 10:46
**/
@Component
public class HelloSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(){
String context = "hello----"+LocalDateTime.now();
System.out.println("send:"+context);
//往名称为 hello 的queue中发送消息
this.amqpTemplate.convertAndSend("hello",context);
}
}
3 接收者
package com.example.rabbitmqdemo.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消息接受者
*
* @author itguang
* @create 2018-04-21 10:50
**/
@Component
@RabbitListener(queues = "hello") //监听 名称为 hello 的queue
public class HelloReceiver {
//消息处理器
@RabbitHandler
public void process(String message){
System.out.println("Receiver:"+message);
}
}
测试
package com.example.rabbitmqdemo;
import com.example.rabbitmqdemo.rabbitmq.HelloSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqdemoApplicationTests {
@Autowired
HelloSender helloSender;
@Test
public void contextLoads() {
helloSender.send();
}
}
查看控制台输出结果
send:hello----2018-04-21T11:29:47.739
Receiver:hello----2018-04-21T11:29:47.739
一对多发送:一个发送者多个接受者
对上面的代码进行了小改造,接收端注册了两个Receiver,Receiver1和Receiver2,发送端加入参数计数,接收端打印接收到的参数,下面是测试代码,发送一百条消息,来观察两个接收端的执行效果
添加一个队列叫 hello2
package com.example.rabbitmqdemo.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ 配置类
*
* @author itguang
* @create 2018-04-21 10:24
**/
@Configuration
public class RabbitConfig {
@Bean
public Queue queue(){
return new Queue("hello");
}
@Bean
public Queue queue2(){
return new Queue("hello2");
}
}
给队列 hello2 发送消息,接受一个计数参数
package com.example.rabbitmqdemo.rabbitmq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;
/**
* 消息发送者
*
* @author itguang
* @create 2018-04-21 10:46
**/
@Component
public class HelloSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(){
String context = "hello----"+LocalDateTime.now();
System.out.println("send:"+context);
this.amqpTemplate.convertAndSend("hello",context);
}
//给hello2发送消息,并接受一个计数参数
public void send2(int i){
String context = i+"";
System.out.println(context+"--send:");
this.amqpTemplate.convertAndSend("hello2",context);
}
}
两个hello2 的接受者
@Component
@RabbitListener(queues = "hello2")
public class HelloReceiver1 {
@RabbitHandler
public void process(String message){
System.out.println("Receiver1:"+message);
}
}
@Component
@RabbitListener(queues = "hello2")
public class HelloReceiver2 {
@RabbitHandler
public void process(String message){
System.out.println("Receiver2:"+message);
}
}
测试
@Test
public void manyReceiver(){
for (int i=0;i<100;i++){
helloSender.send2(i);
}
}
查看控制台输出结果:
0--send:
1--send:
2--send:
3--send:
4--send:
...(省略)
58--send:
59--send:
60--send:
61--send:
62--send:
63--send:
Receiver2:1
Receiver1:0
64--send:
65--send:
Receiver1:2
Receiver2:3
66--send:
Receiver1:4
Receiver2:5
...(省略)
可以看到:在消息发送到63时,接受者Receiver已经收到了消息,
结论:一个发送者,N个接受者,经过测试会均匀的将消息发送到N个接收者中
多对多: 多个发送者对多个接受者
我们可以注入两个发送者,放在循环中,如下:
@Test
public void many2many(){
for (int i=0;i<100;i++){
helloSender.send2(i);
helloSender2.send2(i);
}
}
运行单元测试,查看控制台输出:
0--send:
0--send:
1--send:
1--send:
2--send:
2--send:
3--send:
3--send:
...(省略)
22--send:
22--send:
23--send:
23--send:
24--send:
24--send:
Receiver2:0
25--send:
25--send:
Receiver2:1
26--send:
Receiver2:2
26--send:
Receiver2:3
27--send:
Receiver1:0
27--send:
Receiver2:4
Receiver1:1
28--send:
Receiver2:5
Receiver1:2
28--send:
Receiver2:6
Receiver1:3
29--send:
Receiver2:7
Receiver1:4
29--send:
Receiver2:8
Receiver1:5
30--send:
Receiver2:9
Receiver1:6
30--send:
31--send:
31--send:
32--send:
32--send:
结论:和一对多一样,接收端仍然会均匀接收到消息
发送对象
首先我们创建一个实体类对象 User,注意必须实现 Serializable 接口.
package com.example.rabbitmqdemo.pojo;
import java.io.Serializable;
/**
* @author itguang
* @create 2018-04-21 15:46
**/
public class User implements Serializable {
private String username;
private String password;
public User(String username, String password) {
this.username = username;
this.password = password;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
@Override
public String toString() {
return "User{" +
"username='" + username + '\'' +
", password='" + password + '\'' +
'}';
}
}
然后在配置文件中再创建一个队列,叫 object_queue
@Bean
public Queue queue3(){
return new Queue("object_queue");
}
接下里就是User对象的两个发送者ObjectSender和接受者ObjectReceiver:
@Component
public class ObjectSender {
@Autowired
AmqpTemplate amqpTemplate;
public void sendUser(User user){
System.out.println("Send object:"+user.toString());
this.amqpTemplate.convertAndSend("object_queue",user);
}
}
@Component
@RabbitListener(queues = "object_queue")
public class ObjectReceiver {
@RabbitHandler
public void objectReceiver(User user){
System.out.println("Receiver object:"+user.toString());
}
}
运行单元测试,查看控制台输出结果:
Send object:User{username='李增光', password='666666'}
Receiver object:User{username='李增光', password='666666'}
Topic Exchange
topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列
首先对topic规则配置,这里使用两个队列来测试
package com.example.rabbitmqdemo.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author itguang
* @create 2018-04-21 16:10
**/
@Configuration
public class TopicRabbitConfig {
final static String message = "topic.message";
final static String messages = "topic.messages";
//创建两个 Queue
@Bean
public Queue queueMessage(){
return new Queue(TopicRabbitConfig.message);
}
@Bean
public Queue queueMessages(){
return new Queue(TopicRabbitConfig.messages);
}
//配置 TopicExchange,指定名称为 topicExchange
@Bean
public TopicExchange exchange(){
return new TopicExchange("topicExchange");
}
//给队列绑定 exchange 和 routing_key
@Bean
public Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange){
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
@Bean
public Binding bingingExchangeMessages(Queue queueMessages,TopicExchange exchange){
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}
消息发送者:都是用topicExchange,并且绑定到不同的 routing_key
package com.example.rabbitmqdemo.topic;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author itguang
* @create 2018-04-21 16:26
**/
@Component
public class TopicSender {
@Autowired
AmqpTemplate amqpTemplate;
public void send1(){
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
amqpTemplate.convertAndSend("topicExchange","topic.message",context);
}
public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
amqpTemplate.convertAndSend("topicExchange", "topic.messages", context);
}
}
两个消息接受者,分别指定不同的 queue
package com.example.rabbitmqdemo.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author itguang
* @create 2018-04-21 16:34
**/
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1 {
@RabbitHandler
public void process(String message){
System.out.println("Receiver topic.message :"+ message);
}
}
package com.example.rabbitmqdemo.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author itguang
* @create 2018-04-21 16:34
**/
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2 {
@RabbitHandler
public void process(String message){
System.out.println("Receiver topic.messages: "+ message);
}
}
测试:
发送send1会匹配到topic.#和topic.message 两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息
Fanout Exchange
Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
Fanout 相关配置:
package com.example.rabbitmqdemo.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.security.PublicKey;
/**
* @author itguang
* @create 2018-04-21 17:03
**/
@Configuration
public class FanOutRabbitMq {
//创建三个队列
@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
//创建exchange,指定交换策略
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
//分别给三个队列指定exchange,这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
@Bean
public Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange){
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
public Binding bindingExchangeB(Queue BMessage,FanoutExchange fanoutExchange){
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}
消息发送者:
这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略
package com.example.rabbitmqdemo.fanout;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author itguang
* @create 2018-04-21 17:13
**/
@Component
public class FanoutSender {
@Autowired
AmqpTemplate amqpTemplate;
public void send(){
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
//这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
amqpTemplate.convertAndSend("fanoutExchange","", context);
}
}
三个消息接受者:
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
@RabbitHandler
public void process(String message){
System.out.println("Receiver form fanout.A: "+message);
}
}
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {
@RabbitHandler
public void process(String message){
System.out.println("Receiver form fanout.B: "+message);
}
}
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {
@RabbitHandler
public void process(String message){
System.out.println("Receiver form fanout.C: "+message);
}
}
运行单元测试,查看结果:
Sender : hi, fanout msg
Receiver form fanout.C: hi, fanout msg
Receiver form fanout.A: hi, fanout msg
Receiver form fanout.B: hi, fanout msg
结果说明,绑定到fanout交换机上面的队列都收到了消息.
原文:https://blog.csdn.net/itguangit/article/details/80031595