说明
本篇博文主要记录了springboot中rabbitmq的使用。使用rabbitmq实现消息消费处理失败后的延迟重试。
正文
rabbitmq不支持延迟任务,所以使用rabbitmq的支持的其他特性来实现延迟任务。这里主要使用了消息的有效期(TTL)和死信路由DLX。消息的有效期可以通过队列设置,也可以通过设置消息的相关属性。死信路由的本质是一个Topic路由器,通过在创建缓冲队列时声明"x-dead-letter-exchange"属性,将缓冲队列与DLX绑定,当消息在缓冲队列到期时,会自动通过DLX路由到与消息携带的路由关键字(routingKey)匹配的队列中。
简单来说,实现消息的失败重试的总体流程为:消息生产者将消息发送到普通队列,消费者接受到后进行处理,若处理失败,将该消息发送到缓冲队列,消息在队列到期后会自动的被发送到普通队列,再次消费。原理图如下:
1.配置类MQConfig
配置类用来创建生产者,消费者,创建队列,交换机和绑定关系。
@Configuration
public class MQConfig {
//创建消费者
@Bean
public Consumer consumer() {
return new Consumer();
}
//创建生产者
@Bean
public Producer producer() {
return new Producer();
}
//创建生产者的topic exchange
@Bean
public TopicExchange proExchange() {
return new TopicExchange("proExc");
}
//创建消费者的fanout exchange
@Bean
public FanoutExchange conExchange() {
return new FanoutExchange("conExc");
}
//创建缓冲队列的dead letter exchange (死信路由)
@Bean
public TopicExchange dlxExchange() {
return new TopicExchange("dlxExc");
}
//创建普通(生产者)对列
@Bean
public Queue proQueue() {
return new Queue("proQueue");
}
//创建缓冲队列, 声明dlx
@Bean
public Queue cacheQueue() {
Map<String, Object> args = new HashMap<>();
//dlx的名称必须与创建exchange的名称相同
args.put("x-dead-letter-exchange","dlxExc");
return QueueBuilder.nonDurable("delayqueue").withArguments(args).build();
}
//普通队列与生产者的 topic exchage 绑定
@Bean
public Binding bindingPtoP(@Qualifier("proQueue") Queue proQueue, @Qualifier("proExchange") TopicExchange proExchange) {
return BindingBuilder.bind(proQueue).to(proExchange).with("email.#");
}
//缓冲队列与消息者的 fanout exchange 绑定
@Bean
public Binding bindingCtoF(@Qualifier("cacheQueue") Queue cacheQueue, @Qualifier("conExchange") FanoutExchange conExchange) {
return BindingBuilder.bind(cacheQueue).to(conExchange);
}
//普通队列与死信路由 dlx 绑定
@Bean
public Binding bindingPtoD(@Qualifier("proQueue") Queue proQueue, @Qualifier("dlxExchange") TopicExchange dlxExchange) {
return BindingBuilder.bind(proQueue).to(dlxExchange).with("email.#");
}
}
2.消费者Consumer
在设置消息的有效期时,可以通过MessageProperties来设置expiration,以ms为单位。消息失败被发送到缓冲队列时,这里使用了Fanout Exchange,此交换机会忽略消息携带的routingKey,但是DLX是一个Topic Exchange,到消息到期时,会根据消息携带的routingKey来将消息重新路由到相匹配的队列,所以在失败发送时,要指定routingKey。
public class Consumer {
@Autowired
private FanoutExchange conExchange;
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = {"proQueue"})
public void receiveEmail(String email) throws UnsupportedEncodingException {
System.out.println("receive a email " + email);
String routingKey = "email";
MessageProperties messageProperties = getMessageProperties();
rabbitTemplate.convertAndSend(conExchange.getName(), routingKey, new Message((email + " try again").getBytes("utf-8"),messageProperties));
}
/**
* 设置消息的相关参数
* @return
*/
private MessageProperties getMessageProperties() {
MessageProperties messageProperties = new MessageProperties();
//设置消息的有效期 ms为单位
messageProperties.setExpiration("5000");
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
return messageProperties;
}
}
3.生产者Producer
生产者将消息发送到一个交换机,这里使用了Topic交换机,也可以使用默认的交换机。Default Exchange是一个Direct类型的交换机,当所有队列被创建时都会自动的用与队列名相同的bindingKey来将队列与默认的交换机绑定。
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Resource(name = "proExchange")
private TopicExchange proExchange;
public void sendEmail() {
String message = "this is a email";
String routingKey = "email";
rabbitTemplate.convertAndSend(proExchange.getName(),routingKey,message);
System.out.println("send successfully");
}
}
4.测试
@RestController
public class TestController {
@Autowired
private Producer producer;
@RequestMapping("/send")
public String test() {
producer.sendEmail();
return "test";
}
}
这篇博文只是总结记录了springboot使用rabbitmq来实现延迟消费的用法,有关rabbitmq的更多信息,详见RabbitMQ Tutorials。
参考资料:http://www.rabbitmq.com/getstarted.html
源码地址:https://github.com/Edenwds/springboot_study/tree/master/rabbitmq