【问题标题】:Ensure that AMQP exchange exists before publishing a message to it [closed]在向其发布消息之前确保 AMQP 交换存在 [关闭]
【发布时间】:2015-08-17 16:24:01
【问题描述】:

问题

Java 应用程序使用 RabbitMQ 和客户端库 com.rabbitmq:amqp-client 连接到它。应用程序在初始化期间声明 AMQP 交换并定期向其发布消息。

如果该交换因某种原因被删除,应用程序将无法向其发布消息,并且库会自动关闭 AMQP 通道。因此,任何后续发布(即使在重新创建交换之后)都会失败,并出现此类异常:

Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'logs' in vhost '/', class-id=60, method-id=40)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:296)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:648)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:631)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:622)

如何保证在发布之前交换确实存在?我看到以下选项。

可能的解决方案

1。尝试在每次明确发布之前重新声明交换

感谢exchangeDeclare 是幂等的,如果交换已经到位,我可以在任何发布之前明确声明交换:

channel.exchangeDeclare(EXCHANGE_NAME, "fanout", false, true, null);
channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message);

这段代码的问题在于它看起来很愚蠢,因为大多数时候交换已经到位,而声明只是多余的。

如果在声明和发布之间完全删除交换,我仍然会遇到麻烦。

2。通过exchangeDeclarePassive 验证交换是否存在

我可以使用exchangeDeclarePassive 在发布之前检查交换是否存在,但以下明显的方法不起作用:

private static void ensureExchangeExists(Channel channel) throws IOException {
    try {
        channel.exchangeDeclarePassive(EXCHANGE_NAME);
    } catch (Exception e) {
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout", false, true, null);
    }
}

问题是如果交换丢失,exchangeDeclarePassive 会抛出异常,并且库会自动关闭通道。所以 catch 块中的代码不能声明交换(因为它试图在关闭的通道上执行操作)。

所以我不能再使用单个 AMQP 通道并且必须以某种方式管理它们。

如果在声明和发布之间删除交换,我仍然有麻烦。

3。发布时捕获异常

您不能只用 try/catch 块包装调用 channel.basicPublish,因为如果交换丢失,则不会引发异常。 This answer 解释了在这种情况下实际发生的情况。

但是您可以注册一个ShutdownListener,它能够检测通道何时关闭,调查原因(通过cause.isInitiatedByApplication()/cause.getReason())并采取必要的措施。

问题

问题是在向它发布消息之前确保 AMQP 交换存在的最佳方法是什么?为什么?

【问题讨论】:

    标签: java rabbitmq amqp


    【解决方案1】:

    IME,选项 #1 接近于更好的选项。

    我发现最有效的方法是将给定发布者的代码以一种允许我在第一次创建对象实例时声明/重新声明队列的方式封装。然后我可以重复使用相同的对象实例来发布消息,而无需再次重新声明交换。

    如果我创建对象的新实例,它将重新声明交换。但是重复使用同一个实例可以防止这种情况发生。

    这个策略对我很有效。

    另一种选择是在应用启动时预先定义和预先声明您的交换、队列和绑定。这样你就不必再担心了。这里的缺点是,你已经预先确定了所有的交换、队列和绑定......这在某些应用程序中有效,但在其他应用程序中无效。

    【讨论】:

    • 感谢您的回答 Derrick。然而问题是,当应用程序运行时,某些外部方删除了交换。因此,从应用程序的角度来看,它可能随时消失。因此无论是急切初始化还是惰性初始化都不起作用。
    • 在这种情况下,您应该使用选项#1 并在发送任何消息之前重新声明交换。请注意,这将导致巨大的性能损失......您应该与运行其他系统的团队合作以解决问题的根源,但在发送之前重新声明现在应该足够了
    • 嗯,不。我收回之前说过的话。您可能应该使用选项 #1 和选项 #3 的组合,因为正如您在原始帖子中指出的那样,在发布之前可能会删除交换。您应该将丢失的交换视为已知的故障情况,并提供备份解决方案,以便稍后在重新创建交换后重试该消息。这通常是通过日志文件甚至内存中的消息列表来完成的,以从发布者的角度重试
    【解决方案2】:

    经过一番调查,我选择了选项 #3(发布时捕获异常)和 Publisher Confirms,这是跟踪未成功发布的消息所必需的。

    【讨论】:

      猜你喜欢
      • 2017-01-17
      • 2022-01-22
      • 2014-12-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-08-21
      • 2019-08-29
      • 1970-01-01
      相关资源
      最近更新 更多