说明

本篇博文主要记录了springboot中rabbitmq的使用。使用rabbitmq实现消息消费处理失败后的延迟重试。

正文

  rabbitmq不支持延迟任务,所以使用rabbitmq的支持的其他特性来实现延迟任务。这里主要使用了消息的有效期(TTL)死信路由DLX。消息的有效期可以通过队列设置,也可以通过设置消息的相关属性。死信路由的本质是一个Topic路由器,通过在创建缓冲队列时声明"x-dead-letter-exchange"属性,将缓冲队列与DLX绑定,当消息在缓冲队列到期时,会自动通过DLX路由到与消息携带的路由关键字(routingKey)匹配的队列中。
  简单来说,实现消息的失败重试的总体流程为:消息生产者将消息发送到普通队列,消费者接受到后进行处理,若处理失败,将该消息发送到缓冲队列,消息在队列到期后会自动的被发送到普通队列,再次消费。原理图如下:
springboot学习(十三):RabbitMQ的使用 实现消息延迟消费

1.配置类MQConfig

配置类用来创建生产者,消费者,创建队列,交换机和绑定关系。

@Configuration
public class MQConfig {

    //创建消费者
    @Bean
    public Consumer consumer() {
        return new Consumer();
    }

    //创建生产者
    @Bean
    public Producer producer() {
        return new Producer();
    }

    //创建生产者的topic exchange
    @Bean
    public TopicExchange proExchange() {
        return new TopicExchange("proExc");
    }

    //创建消费者的fanout exchange
    @Bean
    public FanoutExchange conExchange() {
        return new FanoutExchange("conExc");
    }

    //创建缓冲队列的dead letter exchange (死信路由)
    @Bean
    public TopicExchange dlxExchange() {
        return new TopicExchange("dlxExc");
    }

    //创建普通(生产者)对列
    @Bean
    public Queue proQueue() {
        return new Queue("proQueue");
    }

    //创建缓冲队列, 声明dlx
    @Bean
    public Queue cacheQueue() {
        Map<String, Object> args = new HashMap<>();
        //dlx的名称必须与创建exchange的名称相同
        args.put("x-dead-letter-exchange","dlxExc");
        return QueueBuilder.nonDurable("delayqueue").withArguments(args).build();
    }

    //普通队列与生产者的 topic exchage 绑定
    @Bean
    public Binding bindingPtoP(@Qualifier("proQueue") Queue proQueue, @Qualifier("proExchange") TopicExchange proExchange) {
        return BindingBuilder.bind(proQueue).to(proExchange).with("email.#");
    }

    //缓冲队列与消息者的 fanout exchange 绑定
    @Bean
    public Binding bindingCtoF(@Qualifier("cacheQueue") Queue cacheQueue, @Qualifier("conExchange") FanoutExchange conExchange) {
        return BindingBuilder.bind(cacheQueue).to(conExchange);
    }

    //普通队列与死信路由 dlx 绑定
    @Bean
    public Binding bindingPtoD(@Qualifier("proQueue") Queue proQueue, @Qualifier("dlxExchange") TopicExchange dlxExchange) {
        return BindingBuilder.bind(proQueue).to(dlxExchange).with("email.#");
    }

}

2.消费者Consumer

在设置消息的有效期时,可以通过MessageProperties来设置expiration,以ms为单位。消息失败被发送到缓冲队列时,这里使用了Fanout Exchange,此交换机会忽略消息携带的routingKey,但是DLX是一个Topic Exchange,到消息到期时,会根据消息携带的routingKey来将消息重新路由到相匹配的队列,所以在失败发送时,要指定routingKey。

public class Consumer {

    @Autowired
    private FanoutExchange conExchange;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = {"proQueue"})
    public void receiveEmail(String email) throws UnsupportedEncodingException {
        System.out.println("receive a email " + email);
        String routingKey = "email";
        MessageProperties messageProperties = getMessageProperties();
        rabbitTemplate.convertAndSend(conExchange.getName(), routingKey, new Message((email + " try again").getBytes("utf-8"),messageProperties));
    }

    /**
     * 设置消息的相关参数
     * @return
     */
    private MessageProperties getMessageProperties() {
        MessageProperties messageProperties = new MessageProperties();
        //设置消息的有效期 ms为单位
        messageProperties.setExpiration("5000");
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
        return messageProperties;
    }
}

3.生产者Producer

生产者将消息发送到一个交换机,这里使用了Topic交换机,也可以使用默认的交换机。Default Exchange是一个Direct类型的交换机,当所有队列被创建时都会自动的用与队列名相同的bindingKey来将队列与默认的交换机绑定。

public class Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Resource(name = "proExchange")
    private TopicExchange proExchange;

    public void sendEmail() {
        String message = "this is a email";
        String routingKey = "email";
        rabbitTemplate.convertAndSend(proExchange.getName(),routingKey,message);
        System.out.println("send successfully");
    }
}

4.测试

@RestController
public class TestController {

    @Autowired
    private Producer producer;

    @RequestMapping("/send")
    public String test() {
        producer.sendEmail();
        return "test";
    }
}

springboot学习(十三):RabbitMQ的使用 实现消息延迟消费

这篇博文只是总结记录了springboot使用rabbitmq来实现延迟消费的用法,有关rabbitmq的更多信息,详见RabbitMQ Tutorials

参考资料:http://www.rabbitmq.com/getstarted.html
源码地址:https://github.com/Edenwds/springboot_study/tree/master/rabbitmq

相关文章: