即使没有与您的标头匹配的队列,消息仍被视为已确认的问题。来自文档 (https://www.rabbitmq.com/confirms.html):
对于不可路由的消息,代理将在
exchange 验证消息不会路由到任何队列(返回一个空的
队列列表)。如果消息也被强制发布,则
basic.return 在 basic.ack 之前发送给客户端。也是如此
用于否定确认 (basic.nack)。
相反,您应该检查 basic.return 消息以检测消息是否已被路由。
我用wireshark检查过,确实我可以看到如果一条消息没有被路由,那么就会有一个AMQP basic.return消息。
我想你应该从
开始
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("App.handleReturn");
System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]");
}
});
事实上,如果消息没有被路由,我会得到这个:
replyCode = [312],replyText = [NO_ROUTE],exchange = [headers_logs],
routingKey = [], pro....
此外,如果您想在 Java 中模拟 Pika 的同步行为,您似乎可以通过在发布消息之前获取当前发布序列号并注册确认侦听器而不是依赖 .waitForConfirmsOrDie() 来实现。
所以一个完整的代码示例应该是:
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("App.handleReturn");
System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]");
}
});
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("App.handleAck");
System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("App.handleNack");
System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]");
}
});
long nextPublishSeqNo = channel.getNextPublishSeqNo();
System.out.println("nextPublishSeqNo = " + nextPublishSeqNo);
channel.basicPublish("headers_logs",
"",
true,
props,
"data".getBytes());
在返回/确认回调中,您需要查找在发布消息之前获得的频道发布序列号。
如果您查看线路上发生的情况,如果消息没有被路由到任何队列,RabbitMq 会发送回一条 basic.return 消息,该消息还包含确认(交付标签)。如果消息已被路由,RabbitMq 会发回一条包含确认信息的 bacic.ack 消息。
似乎 RabbitMq Java 客户端总是在 basicConfirm() 之前调用 basicReturn() 回调,因此判断消息是否已路由的逻辑可以是:
在频道上注册返回和确认听众;
记住一个频道的下一个发布序列号;
等待返回或确认回调。如果是返回回调 - 消息尚未路由,您应该忽略对同一传递标签的进一步确认。如果在收到 handleReturn() 之前收到 handleAck() 回调,则表示消息已被路由到队列。
虽然我不确定在哪种情况下可以调用 .handleNack()。