一、前言:
1)生产者的消息,往mq服务器投递时,因网络中断、抖动等原因导致投递失败;
2)本文采取实现ConfirmCallback接口,对消息投递进行确认;
a、若返回true,则认为消息已成功投递;
b、若返回false,则认为消息投递失败,需要重新投递;
3)ConfirmCallback接口在消息不能到达exchange时进行回调;
4)模拟消息投递失败的方案:在消息投递时,强制关闭connection连接;
二、可靠投递的具体方案:
1)生产者在发送消息之前,生成ack唯一确认id;
2)以ackId为键,消息为value,保存进redis缓存;
3)实现ConfirmCallback接口;
4)ConfirmCallback触发回调时,若ack为true,则直接删除此次ackId对应的msg;若ack为false,则走5步骤;
5)若ack为false,则将该ackId对应的msg取出,放置进失败的ackFailList中;
6)开启定时任务,扫描ackFailList,以一定策略重发失败的msg;
上述方案会产生消息重复发送的问题,考虑到消息发送的速度应尽可能快,才能支持更高的并发。故去重动作,放到了消费端实现。具体方案如下:
消息重复消费处理方案:
1)使用业务ID,即businessId,作为消费唯一依据;(businessId可设计为该业务的具体业务ID,或是设计为与业务无关的唯一性ID)
2)生产者在发送消息时,将businessId、businessBody封装在BusinessMsg中;
3)消费者在接收到BusinessMsg后,判断bisinessId是否在已消费集合;
4)若已消费集合存在该businessId,则不再消费;
5)否则,消费businessId代表的业务消息;消费完毕后,将该businessId保存进已消费集合;
详述:
1、定义队列;
2、定义生产者;
3、定义消费者;
4、对信道开启confirm模式;
4、实现ConfirmCallback接口;
5、测试定义;
6、消息投递丢失测试,测试数据量为3w;
1)在消息投递过程中,强制关闭connection连接;
2)测试结果如下:
3)结论:
a)消息没有丢失;
b)消息接收重复了,总量为39548条;
c)消息消费符合预期,总量为3w条,正常;
7、demo地址;
https://github.com/haishui211/rabbitmqRep.git
8、参考地址;
https://www.jianshu.com/p/6579e48d18ae