本章节我们主要聊一聊RabbitMQ使用必须考虑的问题,就是消息可靠性!在生产环境下如何确保消息的可靠性投递,我们首先需要考虑两个问题
1、生产者发送消息,是否发送成功?
2、消费者接收消息如何确认以及拒绝?
当然我们所说的可靠并非一个绝对的概念,因网络、硬件、不可抗因素等;可靠性是一个相对的概念,在条件合理的范围内系统所能确保一切尽可能的趋于完美的消息可靠性;

 

我们来思考一下需要考虑哪些环节;

Send Massage(消息投递者) 在将消息发送到交换器Exchange的时候,默认RabbitMQ不进行确认投递者是不知道是否投递成功,也就是默认情况下生产者是不知道消息有没有正确地到达服务器,没有到达服务器,如果出现如:网络闪断等因素,则这条消息会无法投递到Exchange
Exchange通过RoutingKey将消息路由至Queue ,这个环节中如果无法路由至Queue队列,如何处理该消息?消息已经路由至Queue队列,却发现没有消费者,又如何处理?,是否也有一样的通知机制告诉我们?
在接收者Receive Message(消息消费者) 在接收到消息后,如何通知RabbitMQ我已经接收到该消息?是否消费者也需要一个确认告知RabbitMQ已经接收到消息?
带着这一系列问题,我们先来看看如何进行保障消息投递的确认;

二 、生产者确认

RabbitMQ针对这个问题,提供了两种解决方式;

  • 事务机制 :RabbitMQ提供了事务机制保证消息投递,RabbitMQ客户端中与事务机制相关的方法有三个: channel.txSelect 和
    channel.txCommit 和channel.txRollback

channel.txSelect : 将当前的channel通道设置为事务模式;
channel.txCommit :用于提交事务;
channel.txRollback :用于事务回滚;

但是使用事务会大大降低RabbitMQ的性能,在一些较小的吞吐量情况下,也可以采用事务方式,具体情况视各自的系统来决定,这里仅以一段代码来让大家了解事务的机制
try {
    channel.txSelect();
    channel.basicPublish(exchange , routingKey , 
    MessageProperties.PERSISTENT_TEXT_PLAIN , msg.getBytes());
    int result = 1 / 0 ;
    channel.txCommit();
}catch (Exception e) {
    e.printStackTrace();
    channel.txRollback();
}
View Code

那么,既然已经有事务了,为何还要使用发送方确认模式呢,原因是因为事务的性能是非常差的。根据相关资料,事务会降低2~10倍的性能。

 

  • **生产者确认机制 :(Publisher Confirm)机制 **

Rabbit-MQ-4 生产者确认机制 + 消费者确认和拒绝机制

 

 Rabbit-MQ-4 生产者确认机制 + 消费者确认和拒绝机制

 

、生产者将Channel设置成Confirm模式,当设置Confirm模式后所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始,ID在同个Channel范围是唯一的),一旦消息被投递到所有匹配的队列之后Broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;

2、如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出;

3、RabbitMQ回调消息的deliveryTag包含了确认消息的ID,此外RabbitMQ也可以设置channel.basicAck 方法中的multiple参数,表示到这个序号之前的所有消息都己经得到了处理;稍后介绍handleNack 和 handleAck两个方法我们再举个说明;

4、confirm的机制是异步的,如果消息成功发送,会返回ack消息供异步处理,如果消息发送失败发生异常,也会返回nack消息,confirm的时间没有明确说明,并且同一个消息只会被confirm一次;

接下来介绍几种confirm方法

  1. 普通confirm方法 : 每发送一条消息后,调用channel.waitForConfirms方法,等待服务器的确认返回;
    先看代码样例,注意看注释
//开启confirm模式
channel.confirmSelect();
//模拟发送50条消息
for(int i =0;i<1000;i++){
    String message = "Hello World RabbitMQ";
    //发送消息
    channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
    //每发送2条判断一次是否回复
    if(i%2==0){
        //waitForConfirms可以换成带有时间参数的方法waitForConfirms(Long mills)指定等待响应时间
        if(channel.waitForConfirms()){
              System.out.println("Message send success."); 
          }
     }
}
View Code

相关文章:

  • 2021-06-09
  • 2022-01-11
  • 2022-12-23
  • 2022-03-02
  • 2022-12-23
  • 2021-12-02
  • 2021-05-20
  • 2021-10-23
猜你喜欢
  • 2021-11-12
  • 2022-12-23
  • 2021-08-14
  • 2021-11-25
  • 2021-09-16
  • 2021-10-06
  • 2021-10-02
相关资源
相似解决方案