MQ的作用:

  1. 解耦:在项目启动之初是很难预测未来会遇到什么困难的,消息中间件在处理过程中插入了一个隐含的,基于数据的接口层,两边都实现这个接口,这样就允许独立的修改或者扩展两边的处理过程,只要两边遵守相同的接口约束即可。
  2. 冗余(存储):在某些情况下处理数据的过程中会失败,消息中间件允许把数据持久化知道他们完全被处理
  3. 扩展性:消息中间件解耦了应用的过程,所以提供消息入队和处理的效率是很容易的,只需要增加处理流程就可以了。
  4. 削峰:在访问量剧增的情况下,但是应用仍然需要发挥作用,但是这样的突发流量并不常见。而使用消息中间件采用队列的形式可以减少突发访问压力,不会因为突发的超时负荷要求而崩溃
  5. 可恢复性:当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合性,当一个处理消息的进程挂掉后,加入消息中间件的消息仍然可以在系统恢复后重新处理
  6. 顺序保证:在大多数场景下,处理数据的顺序也很重要,大部分消息中间件支持一定的顺序性
  7. 缓冲:消息中间件通过一个缓冲层来帮助任务最高效率的执行
  8. 异步通信:通过把把消息发送给消息中间件,消息中间件并不立即处理它,后续在慢慢处理。

RabbitMQ:

它是采用Erlang语言实现的AMQP(Advanced Message Queued Protocol)的消息中间件,最初起源于金融系统,用在分布式系统存储转发消息。

RabbitMQ发展到今天,被越来越多的人认可,这和它在易用性、扩展性、可靠性和高可用性等方面的卓著表现是分不开的。RabbitMQ的具体特点可以概括为以下几点:

  1. 可靠性:RabbitMQ使用一些机制来保证可靠性,如持久化、传输确认及发布确认等。
  2. 灵活的路由:在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能,RabbitMQ己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。
  3. 扩展性:多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
  4. 高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队仍然可用。
  5. 多种协议:RabbitMQ除了原生支持AMQP协议,还支持STOMP,MQTT等多种消息中间件协议。
  6. 多语言客户端:RabbitMQ几乎支持所有常用语言,比如Jav a、Python、Ruby、PHP、C#、JavaScript等。
  7. 管理界面:RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。
  8. 插件机制:RabbitMQ提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件。

参考原文:https://blog.csdn.net/weixin_40792878/article/details/82555791

系统架构

Rabbitmq系统最核心的组件是Exchange和Queue,下图是系统简单的示意图。Exchange和Queue是在rabbitmq server(又叫做broker)端,producer和consumer在应用端。

RabbitMQ的用法整合

producer&Consumer

producer指的是消息生产者,consumer消息的消费者。

Queue

消息队列,提供了FIFO的处理机制,具有缓存消息的能力。rabbitmq中,队列消息可以设置为持久化,临时或者自动删除。

  1. 设置为持久化的队列,queue中的消息会在server本地硬盘存储一份,防止系统crash,数据丢失
  2. 设置为临时队列,queue中的数据在系统重启之后就会丢失
  3. 设置为自动删除的队列,当不存在用户连接到server,队列中的数据会被自动删除

Exchange

Exchange类似于数据通信网络中的交换机,提供消息路由策略。rabbitmq中,producer不是通过信道直接将消息发送给queue,而是先发送给Exchange。一个Exchange可以和多个Queue进行绑定,producer在传递消息的时候,会传递一个ROUTING_KEY,Exchange会根据这个ROUTING_KEY按照特定的路由算法,将消息路由给指定的queue。和Queue一样,Exchange也可设置为持久化,临时或者自动删除。

Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别:

  1. Direct

    直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue

  2. fanout

    广播是式交换器,不管消息的ROUTING_KEY设置为什么,Exchange都会将消息转发给所有绑定的Queue。

  3. topic

    主题交换器,工作方式类似于组播,Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列。( * 表是匹配一个任意词组,#表示匹配0个或多个词组)

  4. headers
    消息体的header匹配(ignore)

Binding

所谓绑定就是将一个特定的 Exchange 和一个特定的 Queue 绑定起来。Exchange 和Queue的绑定可以是多对多的关系。

virtual host

在rabbitmq server上可以创建多个虚拟的message broker,又叫做virtual hosts (vhosts)。每一个vhost本质上是一个mini-rabbitmq server,分别管理各自的exchange,和bindings。vhost相当于物理的server,可以为不同app提供边界隔离,使得应用安全的运行在不同的vhost实例上,相互之间不会干扰。producer和consumer连接rabbit server需要指定一个vhost。

通信过程

假设P1和C1注册了相同的Broker,Exchange和Queue。P1发送的消息最终会被C1消费。基本的通信流程大概如下所示:

  1. P1生产消息,发送给服务器端的Exchange
  2. Exchange收到消息,根据ROUTINKEY,将消息转发给匹配的Queue1
  3. Queue1收到消息,将消息发送给订阅者C1
  4. C1收到消息,发送ACK给队列确认收到消息
  5. Queue1收到ACK,删除队列中缓存的此条消息

Consumer收到消息时需要显式的向rabbit broker发送basic.ack消息或者consumer订阅消息时设置auto_ack参数为true。在通信过程中,队列对ACK的处理有以下几种情况:

  1. 如果consumer接收了消息,发送ack,rabbitmq会删除队列中这个消息,发送另一条消息给consumer。
  2. 如果cosumer接受了消息,
    但在发送ack之前断开连接,rabbitmq会认为这条消息没有被deliver,在consumer在次连接的时候,这条消息会被redeliver。
  3. 如果consumer接受了消息,但是程序中有bug,忘记了ack,rabbitmq不会重复发送消息。
  4. rabbitmq2.0.0和之后的版本支持consumer
    reject某条(类)消息,可以通过设置requeue参数中的reject为true达到目地,那么rabbitmq将会把消息发送给下一个注册的consumer。

spring-boot-starter-amqp

高级消息队列协议(AMQP)是面向消息中间件的平台中立的有线协议。Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发。Spring Boot为通过RabbitMQ与AMQP一起工作提供了一些便利,包括spring-boot-starter-amqp “Starter”。

springboot集成RabbitMQ非常简单,如果只是简单的使用配置非常少,springboot提供了spring-boot-starter-amqp项目对消息各种支持。

添加依赖

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

RabbitMQ是基于AMQP协议的轻量级,可靠,可扩展,可移植的消息代理。Spring使用RabbitMQ通过AMQP协议进行通信。

属性配置

RabbitMQ配置由外部配置属性控制 spring.rabbitmq.*。例如,您可以在以下部分声明以下部分 application.properties:

spring.rabbitmq.host = localhost
spring.rabbitmq.port = 5672
spring.rabbitmq.username = guest
spring.rabbitmq.password = guest

快速上手

1.队列配置

package com.example.rabbitmqdemo.config;
 
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 配置类
 *
 * @author itguang
 **/
@Configuration
public class RabbitConfig {

    @Bean
    public Queue queue(){
        return new Queue("hello");
    }
}

2 发送者
rabbitTemplate是springboot 提供的默认实现.

package com.example.rabbitmqdemo.rabbitmq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;

/**
 * 消息发送者
 *
 * @author itguang
 * @create 2018-04-21 10:46
 **/

@Component
public class HelloSender {


    @Autowired
    private AmqpTemplate amqpTemplate;


    public void send(){
        String context = "hello----"+LocalDateTime.now();
        System.out.println("send:"+context);
        //往名称为 hello 的queue中发送消息
        this.amqpTemplate.convertAndSend("hello",context);
    }

}

3 接收者

package com.example.rabbitmqdemo.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消息接受者
 *
 * @author itguang
 * @create 2018-04-21 10:50
 **/

@Component
@RabbitListener(queues = "hello") //监听 名称为 hello 的queue
public class HelloReceiver {

    //消息处理器
    @RabbitHandler
    public void process(String message){
        System.out.println("Receiver:"+message);

    }
}

测试

package com.example.rabbitmqdemo;

import com.example.rabbitmqdemo.rabbitmq.HelloSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqdemoApplicationTests {

    @Autowired
    HelloSender helloSender;

    @Test
    public void contextLoads() {
        helloSender.send();

    }
}

查看控制台输出结果

send:hello----2018-04-21T11:29:47.739
Receiver:hello----2018-04-21T11:29:47.739

一对多发送:一个发送者多个接受者

对上面的代码进行了小改造,接收端注册了两个Receiver,Receiver1和Receiver2,发送端加入参数计数,接收端打印接收到的参数,下面是测试代码,发送一百条消息,来观察两个接收端的执行效果

添加一个队列叫 hello2

package com.example.rabbitmqdemo.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 配置类
 *
 * @author itguang
 * @create 2018-04-21 10:24
 **/
@Configuration
public class RabbitConfig {



    @Bean
    public Queue queue(){
        return new Queue("hello");
    }

    @Bean
    public Queue queue2(){
        return new Queue("hello2");
    }    
}

给队列 hello2 发送消息,接受一个计数参数

package com.example.rabbitmqdemo.rabbitmq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;

/**
 * 消息发送者
 *
 * @author itguang
 * @create 2018-04-21 10:46
 **/

@Component
public class HelloSender {


    @Autowired
    private AmqpTemplate amqpTemplate;


    public void send(){
        String context = "hello----"+LocalDateTime.now();
        System.out.println("send:"+context);
        this.amqpTemplate.convertAndSend("hello",context);
    }

    //给hello2发送消息,并接受一个计数参数
    public void send2(int i){
        String context = i+"";
        System.out.println(context+"--send:");
        this.amqpTemplate.convertAndSend("hello2",context);
    }
}

两个hello2 的接受者

@Component
@RabbitListener(queues = "hello2")
public class HelloReceiver1 {

    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver1:"+message);
    }
}



@Component
@RabbitListener(queues = "hello2")
public class HelloReceiver2 {

    @RabbitHandler
    public void process(String message){
        System.out.println("Receiver2:"+message);
    }
}

测试

@Test
    public void manyReceiver(){
        for (int i=0;i<100;i++){
            helloSender.send2(i);
        }

    }

查看控制台输出结果:

0--send:
1--send:
2--send:
3--send:
4--send:

...(省略)

58--send:
59--send:
60--send:
61--send:
62--send:
63--send:
Receiver2:1
Receiver1:0
64--send:
65--send:
Receiver1:2
Receiver2:3
66--send:
Receiver1:4
Receiver2:5
...(省略)

可以看到:在消息发送到63时,接受者Receiver已经收到了消息,

结论:一个发送者,N个接受者,经过测试会均匀的将消息发送到N个接收者中

多对多: 多个发送者对多个接受者

我们可以注入两个发送者,放在循环中,如下:

 @Test
    public void many2many(){
      for (int i=0;i<100;i++){
          helloSender.send2(i);
          helloSender2.send2(i);

      }
    }

运行单元测试,查看控制台输出:

0--send:
0--send:
1--send:
1--send:
2--send:
2--send:
3--send:
3--send:

...(省略)

22--send:
22--send:
23--send:
23--send:
24--send:
24--send:
Receiver2:0
25--send:
25--send:
Receiver2:1
26--send:
Receiver2:2
26--send:
Receiver2:3
27--send:
Receiver1:0
27--send:
Receiver2:4
Receiver1:1
28--send:
Receiver2:5
Receiver1:2
28--send:
Receiver2:6
Receiver1:3
29--send:
Receiver2:7
Receiver1:4
29--send:
Receiver2:8
Receiver1:5
30--send:
Receiver2:9
Receiver1:6
30--send:
31--send:
31--send:
32--send:
32--send:

结论:和一对多一样,接收端仍然会均匀接收到消息

发送对象

首先我们创建一个实体类对象 User,注意必须实现 Serializable 接口.

package com.example.rabbitmqdemo.pojo;

import java.io.Serializable;

/**
 * @author itguang
 * @create 2018-04-21 15:46
 **/
public class User implements Serializable {


    private String username;
    private String password;

    public User(String username, String password) {
        this.username = username;
        this.password = password;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    public String toString() {
        return "User{" +
                "username='" + username + '\'' +
                ", password='" + password + '\'' +
                '}';
    }
}

然后在配置文件中再创建一个队列,叫 object_queue

@Bean
    public Queue queue3(){
        return new Queue("object_queue");
    }

接下里就是User对象的两个发送者ObjectSender和接受者ObjectReceiver:

@Component
public class ObjectSender {
@Autowired
AmqpTemplate amqpTemplate;

public void sendUser(User user){

    System.out.println("Send object:"+user.toString());
    this.amqpTemplate.convertAndSend("object_queue",user);

 }
}

@Component
@RabbitListener(queues = "object_queue")
public class ObjectReceiver {

   @RabbitHandler
    public void objectReceiver(User user){

   System.out.println("Receiver object:"+user.toString());

   }
}

运行单元测试,查看控制台输出结果:

Send object:User{username='李增光', password='666666'}
Receiver object:User{username='李增光', password='666666'}

Topic Exchange

topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列

首先对topic规则配置,这里使用两个队列来测试

package com.example.rabbitmqdemo.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author itguang
 * @create 2018-04-21 16:10
 **/
@Configuration
public class TopicRabbitConfig {


    final static String message = "topic.message";
    final static String messages = "topic.messages";


    //创建两个 Queue
    @Bean
    public Queue queueMessage(){
        return new Queue(TopicRabbitConfig.message);
    }

    @Bean
    public Queue queueMessages(){
        return new Queue(TopicRabbitConfig.messages);
    }

    //配置 TopicExchange,指定名称为 topicExchange
    @Bean
    public TopicExchange exchange(){
        return new TopicExchange("topicExchange");
    }

    //给队列绑定 exchange 和 routing_key

    @Bean
    public Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange){
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    public Binding bingingExchangeMessages(Queue queueMessages,TopicExchange exchange){
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }
}

消息发送者:都是用topicExchange,并且绑定到不同的 routing_key

package com.example.rabbitmqdemo.topic;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create 2018-04-21 16:26
 **/
@Component
public class TopicSender {

    @Autowired
    AmqpTemplate amqpTemplate;

    public void send1(){
        String context = "hi, i am message 1";
        System.out.println("Sender : " + context);
        amqpTemplate.convertAndSend("topicExchange","topic.message",context);
    }

    public void send2() {
        String context = "hi, i am messages 2";
        System.out.println("Sender : " + context);
        amqpTemplate.convertAndSend("topicExchange", "topic.messages", context);
    }
}

两个消息接受者,分别指定不同的 queue

package com.example.rabbitmqdemo.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create 2018-04-21 16:34
 **/
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1 {

    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver topic.message :"+ message);

    }

}

package com.example.rabbitmqdemo.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create 2018-04-21 16:34
 **/
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2 {

    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver topic.messages: "+ message);

    }

}

测试:

发送send1会匹配到topic.#和topic.message 两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息

Fanout Exchange

Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。

Fanout 相关配置:

package com.example.rabbitmqdemo.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.security.PublicKey;

/**
 * @author itguang
 * @create 2018-04-21 17:03
 **/
@Configuration
public class FanOutRabbitMq {


    //创建三个队列

    @Bean
    public Queue AMessage() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }


    //创建exchange,指定交换策略

    @Bean
    public FanoutExchange fanoutExchange() {

        return new FanoutExchange("fanoutExchange");
    }


    //分别给三个队列指定exchange,这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:

    @Bean
    public Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(AMessage).to(fanoutExchange);

    }

    @Bean
    public Binding bindingExchangeB(Queue BMessage,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(BMessage).to(fanoutExchange);

    }

    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }


}

消息发送者:

这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略

package com.example.rabbitmqdemo.fanout;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create 2018-04-21 17:13
 **/
@Component
public class FanoutSender {


    @Autowired
    AmqpTemplate amqpTemplate;


    public void send(){

        String context = "hi, fanout msg ";
        System.out.println("Sender : " + context);
        //这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
        amqpTemplate.convertAndSend("fanoutExchange","", context);

    }

}

三个消息接受者:

@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver form fanout.A: "+message);

    }

}

@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver form fanout.B: "+message);

    }

}

@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver form fanout.C: "+message);

    }

}

运行单元测试,查看结果:

Sender : hi, fanout msg 

Receiver form fanout.C: hi, fanout msg 
Receiver form fanout.A: hi, fanout msg 
Receiver form fanout.B: hi, fanout msg 

结果说明,绑定到fanout交换机上面的队列都收到了消息.

原文:https://blog.csdn.net/itguangit/article/details/80031595

相关文章:

  • 2021-10-11
  • 2021-05-10
  • 2021-09-03
  • 2021-07-16
  • 2021-10-24
  • 2021-11-13
  • 2021-04-04
猜你喜欢
  • 2022-02-23
  • 2022-12-23
  • 2022-12-23
  • 2021-08-18
  • 2021-04-08
  • 2022-12-23
  • 2021-12-04
相关资源
相似解决方案