【问题标题】:configuring multiple Vhosts in AMQP in rabbitmq configuration spring boot在rabbitmq配置spring boot中在AMQP中配置多个Vhost
【发布时间】:2018-07-20 13:21:57
【问题描述】:

我正在实施一个项目,我必须在 rabbitmq 中的不同虚拟主机之间发送消息。使用 SimpleRoutingConnectionFactory 但得到 java.lang.IllegalStateException:无法确定查找键 [null] 的目标 ConnectionFactory。 任何知道如何实现以下内容的人都是我的配置类代码。

@Configuration
@EnableRabbit
public class RabbitMQConfiguration {

@Autowired
ConnectionProperties connect;

// client1 exchanges
@Bean
public TopicExchange client1Exchange() {
    TopicExchange ex = new TopicExchange("ex_client1");
    ex.setAdminsThatShouldDeclare(client1());
    return ex;
}

// client2 exchange
@Bean
public TopicExchange client2Exchange() {
    TopicExchange ex = new TopicExchange("ex_client2");
    ex.setAdminsThatShouldDeclare(client2Admin());
    return ex;
}

@Bean
public Queue client1Queue() {
    Queue queue = new Queue("client1_queue");
    queue.setAdminsThatShouldDeclare(client1());
    return queue;
}

@Bean
public Binding client1Binding() {
    Binding binding = BindingBuilder.bind(client1Queue())
            .to(client1Exchange())
            .with("client1_key");
    binding.setAdminsThatShouldDeclare(client1());
    return binding;
}


@Bean
public Queue client2Queue() {
    Queue queue = new Queue("client2_queue");
    queue.setAdminsThatShouldDeclare(client2());
    return queue;
}

@Bean
public Binding client2Binding() {
    Binding binding = BindingBuilder.bind(client2Queue())
            .to(client2Exchange())
            .with("client2_key");
    binding.setAdminsThatShouldDeclare(client2());
    return binding;
}

@Bean
@Primary
public ConnectionFactory connectionFactory() {
    SimpleRoutingConnectionFactory connectionFactory = new SimpleRoutingConnectionFactory();
    Map<Object, ConnectionFactory> targetConnectionFactories = new HashMap<>();
    targetConnectionFactories.put("client1", client1ConnectionFactory());
    targetConnectionFactories.put("client2", client2ConnectionFactory());
    connectionFactory.setTargetConnectionFactories(targetConnectionFactories);
    return connectionFactory;
}

@Bean
public ConnectionFactory client1ConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(connect.getRabbitMQHost());
    connectionFactory.setVirtualHost(connect.getRabbitMQClient1VHost());
    connectionFactory.setUsername(connect.getRabbitMQClient1User());
    connectionFactory.setPassword(connect.getRabbitMQClient1Pass());
    return connectionFactory;
}

@Bean
public ConnectionFactory client2ConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(connect.getRabbitMQHost());
    connectionFactory.setVirtualHost(connect.getRabbitMQClient2VHost());
    connectionFactory.setUsername(connect.getRabbitClient2User());
    connectionFactory.setPassword(connect.getRabbitClient2Pass());
    return connectionFactory;
}

// You can comment all methods below and remove interface's implementation to use the default serialization / deserialization
@Bean
public RabbitTemplate rabbitTemplate() {
    final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
    return rabbitTemplate;
}

@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
    return new Jackson2JsonMessageConverter();
}

@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
    return new MappingJackson2MessageConverter();
}

@Bean
public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
    DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
    factory.setMessageConverter(consumerJackson2MessageConverter());
    return factory;
}

@Bean
public TaskExecutor rabbitListenerExecutor() {
    int threads = Integer.valueOf(connect.getMinConsumers()) * 2; // threads = min consumers* no of queues
    final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(threads);
    executor.setMaxPoolSize(threads);
    executor.setThreadNamePrefix("RabbitThreadListener");
    executor.afterPropertiesSet();
    return executor;
}

@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setConcurrentConsumers(Integer.valueOf(connect.getMinConsumers()));
    factory.setPrefetchCount(Integer.valueOf(connect.getPrefetchCount()));
    factory.setTaskExecutor(rabbitListenerExecutor());
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return factory;
}

@Bean
public RabbitAdmin client1() {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(client1ConnectionFactory());
    rabbitAdmin.afterPropertiesSet();
    return rabbitAdmin;
}

@Bean
public RabbitAdmin client2() {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(client2ConnectionFactory());
    rabbitAdmin.afterPropertiesSet();
    return rabbitAdmin;
}

}

我得到了这个堆栈跟踪

o.s.a.r.l.SimpleMessageListenerContainer - Consumer raised exception, 
processing can restart if the connection factory supports it
java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]
    at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.determineTargetConnectionFactory(AbstractRoutingConnectionFactory.java:119)
    at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.createConnection(AbstractRoutingConnectionFactory.java:97)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:90)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:140)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:76)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:505)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1335)
    at java.lang.Thread.run(Thread.java:748)

【问题讨论】:

    标签: java spring-boot rabbitmq spring-amqp


    【解决方案1】:

    RoutingConnectionFactory 一般用于发布消息。

    在侦听器容器中使用路由工厂时,您必须配置查找键以匹配容器中配置的队列名称。

    来自the documentation

    同样从 1.4 版开始,您可以在侦听器容器中配置路由连接工厂。在这种情况下,队列名称列表用作查找键。例如,如果您使用setQueueNames("foo", "bar") 配置容器,则查找键将为"[foo,bar]"(无空格)。

    所以;如果RabbitListener 监听队列foo,则路由查找键必须是[foo]。 (您可以使用不同的键多次添加相同的 CF)。

    或者您可以简单地创建多个容器工厂,每个容器工厂都获得一个具体的 CF 而不是路由 CF。

    编辑

    假设你有

    @RabbitListener(queues = "myQueue", connectionFactory = "myRabbitListenerContainerFactory")
    public void listen(...) {
        ...
    }
    

    如果myQueueclient1 的虚拟主机中,那么您需要在路由器CF 映射中添加一个条目...

    targetConnectionFactories.put("[myQueue]", client1ConnectionFactory());
    

    ...因为为侦听器生成的侦听器容器将在其查找键中使用队列名称。

    或者,创建2个容器工厂;每个都直接连接到 client1 和 client2 CF,而不是路由 CF...

    @Bean
    public SimpleRabbitListenerContainerFactory client1ListenerContainerFactory() {
    
    @Bean
    public SimpleRabbitListenerContainerFactory client2ListenerContainerFactory() {
    

    @RabbitListener(queues = "myQueue", connectionFactory = "client1ListenerContainerFactory")
    public void listen(...) {
        ...
    }
    

    即根本不要将路由 CF 用于侦听器 - 容器只有一个连接。

    【讨论】:

    • 我对rabbitmq很陌生,如果我没有正确理解你,请原谅我。但你是说我实现 routingConnectionFactory 的方式是错误的。基本上为了让它工作我必须在设置中改变什么
    • 否;选项 2 不需要它们,因为侦听器容器将直接从容器工厂为其 vhost 获取正确的 CF。路由 CF 将仅用于发布。顺便说一句,不要将代码放在 cmets 中;很难阅读;最好编辑问题并添加您所做的评论。
    • 啊现在我明白了。所以如果我必须选择选项 2,我仍然需要在这里添加我的所有队列,即使它们是 10。targetConnectionFactories.put("[myQueue1]", client1ConnectionFactory()); targetConnectionFactories.put("[myQueue2]", client1ConnectionFactory()); targetConnectionFactories.put("[myQueue3]", client2ConnectionFactory());
    • 那么发送时 RabbitTemplate 怎么样。我是否必须为第二个虚拟主机实现另一个兔子模板?
    • 请参阅the documentation,了解如何使用RabbitTemplateRoutingConnectionFactory。或者,实际上,您可以为每个使用不同的模板。这实际上取决于您的应用程序的结构,哪个是最好的。
    猜你喜欢
    • 1970-01-01
    • 2018-02-26
    • 2013-04-10
    • 1970-01-01
    • 2017-09-06
    • 1970-01-01
    • 1970-01-01
    • 2018-08-01
    • 1970-01-01
    相关资源
    最近更新 更多