在上一篇文章讲解MQ消息可靠性投递和幂等性中有提到confirm机制的重要性,现在更详细的说明一下
一、Confirm机制
Confirm就是消息确认,当Producer发送消息,如果Broker收到消息,会回复一个应答,我们可以以此来确认消息是否成功送达,是保证
消息可靠性投递的核心保障
Producer代码如下,只需要修改Producer端,而Consumer端不需要修改
//4 指定我们的消息投递模式: 消息的确认模式 channel.confirmSelect(); //5 发送一条消息 String msg = "Hello RabbitMQ Send confirm message!"; channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); //6 添加一个确认监听 channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.err.println("-------no ack!-----------"); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.err.println("-------ack!-----------"); } });
结果:
-------ack!-----------
只要Producer能把消息发送给Broker,就会返回handlerAck中,返回到NAck的可能很小,例如MQ出现异常,queue的容量达到上限
二、Return消息机制
Return Listener用于处理一些不可路由的消息
Producer:
public class Producer { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory factory = new ConnectionFactory(); factory.setVirtualHost("/"); factory.setHost("139.196.75.238"); factory.setPort(5672); //2 获取Connection Connection connection = factory.newConnection(); //3 通过Connection创建一个新的Channel Channel channel = connection.createChannel(); String exchangeName = "exchange_topic"; String routingKey = "fdasfdsafsadf4543453"; //6 添加一个return监听 channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("---------handle return----------"); System.err.println("replyCode: " + replyCode); System.err.println("replyText: " + replyText); System.err.println("exchange: " + exchange); System.err.println("routingKey: " + routingKey); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }); //5 发送一条消息 String msg = "Hello RabbitMQ Send confirm message!"; channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes()); } }