【发布时间】:2015-08-17 16:24:01
【问题描述】:
问题
Java 应用程序使用 RabbitMQ 和客户端库 com.rabbitmq:amqp-client 连接到它。应用程序在初始化期间声明 AMQP 交换并定期向其发布消息。
如果该交换因某种原因被删除,应用程序将无法向其发布消息,并且库会自动关闭 AMQP 通道。因此,任何后续发布(即使在重新创建交换之后)都会失败,并出现此类异常:
Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'logs' in vhost '/', class-id=60, method-id=40)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:296)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:648)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:631)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:622)
如何保证在发布之前交换确实存在?我看到以下选项。
可能的解决方案
1。尝试在每次明确发布之前重新声明交换
感谢exchangeDeclare 是幂等的,如果交换已经到位,我可以在任何发布之前明确声明交换:
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", false, true, null);
channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message);
这段代码的问题在于它看起来很愚蠢,因为大多数时候交换已经到位,而声明只是多余的。
如果在声明和发布之间完全删除交换,我仍然会遇到麻烦。
2。通过exchangeDeclarePassive 验证交换是否存在
我可以使用exchangeDeclarePassive 在发布之前检查交换是否存在,但以下明显的方法不起作用:
private static void ensureExchangeExists(Channel channel) throws IOException {
try {
channel.exchangeDeclarePassive(EXCHANGE_NAME);
} catch (Exception e) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", false, true, null);
}
}
问题是如果交换丢失,exchangeDeclarePassive 会抛出异常,并且库会自动关闭通道。所以 catch 块中的代码不能声明交换(因为它试图在关闭的通道上执行操作)。
所以我不能再使用单个 AMQP 通道并且必须以某种方式管理它们。
如果在声明和发布之间删除交换,我仍然有麻烦。
3。发布时捕获异常
您不能只用 try/catch 块包装调用 channel.basicPublish,因为如果交换丢失,则不会引发异常。 This answer 解释了在这种情况下实际发生的情况。
但是您可以注册一个ShutdownListener,它能够检测通道何时关闭,调查原因(通过cause.isInitiatedByApplication()/cause.getReason())并采取必要的措施。
问题
问题是在向它发布消息之前确保 AMQP 交换存在的最佳方法是什么?为什么?
【问题讨论】: