在上一篇文章讲解MQ消息可靠性投递和幂等性中有提到confirm机制的重要性,现在更详细的说明一下

一、Confirm机制

  Confirm就是消息确认,当Producer发送消息,如果Broker收到消息,会回复一个应答,我们可以以此来确认消息是否成功送达,是保证

消息可靠性投递的核心保障

Producer代码如下,只需要修改Producer端,而Consumer端不需要修改

//4 指定我们的消息投递模式: 消息的确认模式
channel.confirmSelect();

//5 发送一条消息
String msg = "Hello RabbitMQ Send confirm message!";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

//6 添加一个确认监听
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.err.println("-------no ack!-----------");
    }

    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.err.println("-------ack!-----------");
    }
});

结果:

-------ack!-----------

只要Producer能把消息发送给Broker,就会返回handlerAck中,返回到NAck的可能很小,例如MQ出现异常,queue的容量达到上限

二、Return消息机制

Return Listener用于处理一些不可路由的消息

Producer:

public class Producer {

    public static void main(String[] args) throws Exception {
        //1 创建ConnectionFactory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("139.196.75.238");
        factory.setPort(5672);

        //2 获取Connection
        Connection connection = factory.newConnection();

        //3 通过Connection创建一个新的Channel
        Channel channel = connection.createChannel();

        String exchangeName = "exchange_topic";
        String routingKey = "fdasfdsafsadf4543453";

        //6 添加一个return监听
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.err.println("---------handle  return----------");
                System.err.println("replyCode: " + replyCode);
                System.err.println("replyText: " + replyText);
                System.err.println("exchange: " + exchange);
                System.err.println("routingKey: " + routingKey);
                System.err.println("properties: " + properties);
                System.err.println("body: " + new String(body));
            }
        });
        //5 发送一条消息
        String msg = "Hello RabbitMQ Send confirm message!";
        channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());

    }
}
Producer Return

相关文章: