【发布时间】:2019-11-02 21:32:22
【问题描述】:
我有向 RabbitMQ 发送消息的旧系统。
系统只使用一个队列:q.finance.invoice,但它有两种类型的消息,其中消息类型在标题中可用。
第一种
Type : invoice.created
{
"field_1" : "",
"field_2" : "",
}
第二种
Type : invoice.paid
{
"field_5" : "",
"field_6" : "",
}
所以现在我的消费者需要根据数据类型有选择地处理消息。
Spring 有@RabbitHandler 可以做到这一点......如果消息是由 spring 发布的。
我不能使用 @RabbitHandler 注释。
我认为这是因为 @RabbitHandler 正在转换基于旧系统中不存在的 __TypeId__ 标头的消息。
如何模拟这种@RabbitHandler 行为(根据其类型获取数据)?
所以我使用@RabbitListener 来消费消息。
但是@RabbitListener 正在接收所有类型的消息。
我们使用@RabbitListener 的另一个原因是因为我们的错误处理程序依赖于Message 和Channel
我们拥有的基本方法签名是这样的:
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
// convert message body JSON string to object
// process it
}
我正在尝试根据类型进行手动拒绝,这很有效。但是当我有很多听众或队列时,我确信它是不可扩展的
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
@Service
public class InvoiceListenerOnMethod {
private static final Logger log = LoggerFactory.getLogger(InvoiceListenerOnMethod.class);
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoiceCreated(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
if (!StringUtils.equalsIgnoreCase("invoice.created", message.getMessageProperties().getType())) {
log.warn("[on Method] Rejecting invoice created : {}", message);
channel.basicReject(tag, true);
return;
}
log.info("[on Method] Listening invoice created : {}", message);
}
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
if (!StringUtils.equalsIgnoreCase("invoice.paid", message.getMessageProperties().getType())) {
log.warn("[on Method] Rejecting invoice paid : {}", message);
channel.basicReject(tag, true);
return;
}
log.info("[on Method] Listening invoice paid : {}", message);
}
}
看,如果我有 4 条消息(paid-paid-created-created),监听器可以运行超过 4 次,因为我们无法控制谁将接收哪条消息。所以listenInvoicePaid()可以是这样的
- 拒绝()
- 拒绝()
- 确认()
- 拒绝()
- 确认()
同样的方式,在 ack() 之前多次拒绝() 也可以在 listenInvoiceCreated()
中发生
因此,在正确处理所有消息之前,我总共可以调用大约 10 条消息。
有什么修复代码的建议吗?
【问题讨论】:
标签: spring-boot spring-amqp spring-rabbit