【问题标题】:How to get Spring RabbitMQ to create a new Queue?如何让 Spring RabbitMQ 创建一个新队列?
【发布时间】:2013-04-28 13:34:14
【问题描述】:

在我对 rabbit-mq 的(有限)经验中,如果您为尚不存在的队列创建新的侦听器,则会自动创建队列。我正在尝试将 Spring AMQP 项目与 rabbit-mq 一起使用来设置侦听器,但我得到了一个错误。这是我的 xml 配置:

<rabbit:connection-factory id="rabbitConnectionFactory" host="172.16.45.1" username="test" password="password" />

<rabbit:listener-container connection-factory="rabbitConnectionFactory"  >
    <rabbit:listener ref="testQueueListener" queue-names="test" />
</rabbit:listener-container>

<bean id="testQueueListener" class="com.levelsbeyond.rabbit.TestQueueListener"> 
</bean>

我在我的 RabbitMq 日志中得到了这个:

=ERROR REPORT==== 3-May-2013::23:17:24 ===
connection <0.1652.0>, channel 1 - soft error:
{amqp_error,not_found,"no queue 'test' in vhost '/'",'queue.declare'}

还有来自 AMQP 的类似错误:

2013-05-03 23:17:24,059 ERROR [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer] (SimpleAsyncTaskExecutor-1) - Consumer received fatal exception on startup
org.springframework.amqp.rabbit.listener.FatalListenerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.

从堆栈跟踪看来,队列是在“被动”模式下创建的 - 谁能指出我将如何不使用被动模式创建队列,所以我看不到这个错误?还是我错过了什么?

【问题讨论】:

    标签: java spring rabbitmq amqp


    【解决方案1】:

    较旧的线程,但这仍然在 Google 上显示得相当高,所以这里有一些更新的信息:

    2015-11-23

    Spring 4.2.x 开始使用 Spring-Messaging 和 Spring-Amqp 1.4.5.RELEASESpring-Rabbit 1.4.5.RELEASE,通过@Configuration 类的一些注解,声明交换、队列和绑定变得非常简单:

    @EnableRabbit
    @Configuration
    @PropertySources({
        @PropertySource("classpath:rabbitMq.properties")
    })
    public class RabbitMqConfig {    
        private static final Logger logger = LoggerFactory.getLogger(RabbitMqConfig.class);
    
        @Value("${rabbitmq.host}")
        private String host;
    
        @Value("${rabbitmq.port:5672}")
        private int port;
    
        @Value("${rabbitmq.username}")
        private String username;
    
        @Value("${rabbitmq.password}")
        private String password;
    
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
    
            logger.info("Creating connection factory with: " + username + "@" + host + ":" + port);
    
            return connectionFactory;
        }
    
        /**
         * Required for executing adminstration functions against an AMQP Broker
         */
        @Bean
        public AmqpAdmin amqpAdmin() {
            return new RabbitAdmin(connectionFactory());
        }
    
        /**
         * This queue will be declared. This means it will be created if it does not exist. Once declared, you can do something
         * like the following:
         * 
         * @RabbitListener(queues = "#{@myDurableQueue}")
         * @Transactional
         * public void handleMyDurableQueueMessage(CustomDurableDto myMessage) {
         *    // Anything you want! This can also return a non-void which will queue it back in to the queue attached to @RabbitListener
         * }
         */
        @Bean
        public Queue myDurableQueue() {
            // This queue has the following properties:
            // name: my_durable
            // durable: true
            // exclusive: false
            // auto_delete: false
            return new Queue("my_durable", true, false, false);
        }
    
        /**
         * The following is a complete declaration of an exchange, a queue and a exchange-queue binding
         */
        @Bean
        public TopicExchange emailExchange() {
            return new TopicExchange("email", true, false);
        }
    
        @Bean
        public Queue inboundEmailQueue() {
            return new Queue("email_inbound", true, false, false);
        }
    
        @Bean
        public Binding inboundEmailExchangeBinding() {
            // Important part is the routing key -- this is just an example
            return BindingBuilder.bind(inboundEmailQueue()).to(emailExchange()).with("from.*");
        }
    }
    

    一些可提供帮助的资源和文档:

    1. Spring annotations
    2. Declaring/configuration RabbitMQ for queue/binding support
    3. Direct exchange binding (for when routing key doesn't matter)

    注意:看起来我错过了一个版本——从 Spring AMQP 1.5 开始,事情变得更加容易,因为您可以在侦听器处声明完整的绑定!

    【讨论】:

    • 您创建 bean 并使用方法调用再次创建对象。
    【解决方案2】:

    似乎可以解决我的问题的是添加管理员。这是我的xml:

    <rabbit:listener-container connection-factory="rabbitConnectionFactory"  >
        <rabbit:listener ref="orderQueueListener" queues="test.order" />
    </rabbit:listener-container>
    
    <rabbit:queue name="test.order"></rabbit:queue>
    
    <rabbit:admin id="amqpAdmin" connection-factory="rabbitConnectionFactory"/>
    
    <bean id="orderQueueListener" class="com.levelsbeyond.rabbit.OrderQueueListener">   
    </bean>
    

    【讨论】:

    • 是的,您必须在配置中添加RabbitAdmin 以自动声明队列、交换、绑定。
    • 另外,在向它发送消息之前不会声明队列。
    • 它也可以在 Java 注释模式下工作:添加自动装配的 AmqpAdmin 并使用专用方法声明队列会在不存在时自动创建队列!
    • 使用 rabbit:listener 元素,如何让 rabbit 声明一个随机命名的队列以用于扇出交换?
    【解决方案3】:

    Spring Boot 2.1.6Spring AMQP 2.1.7 开始,如果队列不存在,您可以在启动期间创建队列:

    @Component
    public class QueueConfig {
    
        private AmqpAdmin amqpAdmin;
    
        public QueueConfig(AmqpAdmin amqpAdmin) {
            this.amqpAdmin = amqpAdmin;
        }
    
        @PostConstruct
        public void createQueues() {
            amqpAdmin.declareQueue(new Queue("queue_one", true));
            amqpAdmin.declareQueue(new Queue("queue_two", true));
        }
    }
    

    【讨论】:

      【解决方案4】:

      你可以在你的连接标签之后,但在监听器之前添加这个吗:

      <rabbit:queue name="test" auto-delete="true" durable="false" passive="false" />
      

      不幸的是,根据 XSD 架构,被动属性(上面列出的)无效。但是,在我见过的每个 queue_declare 实现中,passive 都是一个有效的 queue_declare 参数。我很想知道这是否可行,或者他们是否计划在未来支持它。

      以下是队列声明选项的完整列表: http://www.rabbitmq.com/amqp-0-9-1-reference.html#class.queue

      这里是 spring rabbit 模式的完整 XSD(包括 cmets): http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd

      【讨论】:

      • 不幸的是,我收到一个错误,即启动时队列元素中不允许使用“被动”属性。 “declare”方法采用被动参数很奇怪,但我似乎无法在 xml 中定义它。
      • 对不起,我太密集了——我看不出我错过了什么。我看到你说根据 xsd 它不是有效的 xml,我可以忽略 eclipse 给我的错误 - 但我实际上也遇到了运行时错误。
      • 没关系。我的中间名很密集。 :-) 我认为我们说的是同一件事。恕我直言,如果它是完整的兔子接口,则应该存在上述被动参数。 Spring RabbitMQ 缺少它。我会向他们提交错误报告,并在您的错误报告中链接回这个问题。他们可能知道我们都缺少的东西,或者这可能只是一个合理的遗漏。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-12-22
      • 1970-01-01
      • 2017-07-30
      • 1970-01-01
      相关资源
      最近更新 更多