基于SpringBoot框架
@Component
public class CallBackProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(CallDetail callDetail, long time) {
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setConfirmCallback(this);
System.out.println("DelayedSender 发送时间: " + LocalDateTime.now() + " callDetail内容:" + callDetail);
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("x-delay", time);
return message;
}
};
this.rabbitTemplate.convertAndSend("delayedExchange", Constant.QUEUE_NAME, callDetail, messagePostProcessor);
}
}
@Component
public class DelayedSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String msg, long time) {
System.out.println("DelayedSender 发送时间: " + LocalDateTime.now() + " msg内容:" + msg);
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("x-delay", time);
return message;
}
};
this.rabbitTemplate.convertAndSend("delayedExchange", "delayedQueue", msg, messagePostProcessor);
}
}
/**
* 自定义的交换机类型
* @return
*/
@Bean
CustomExchange delayedExchange() {
Map<String,Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("delayedExchange","x-delayed-message",true,false,args);
}
/**
* 创建一个队列
* @return
*/
@Bean
public Queue delayedQueue() {
return new Queue(Constant.QUEUE_NAME,true);
}
/**
* 绑定队列到自定义交换机
* @return
*/
@Bean
public Binding bindingNotify() {
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(Constant.QUEUE_NAME).noargs();
}