【问题标题】:Springboot RabbitMQ not receiving Anonymous QueuesSpringboot RabbitMQ 不接收匿名队列
【发布时间】:2018-09-26 01:00:18
【问题描述】:

我正在尝试创建一个扇出交换,每个人都会在其中接收来自发布者的消息。我的问题是,在队列中发布的消息无法被侦听器拾取。设置的队列都是随应用程序实例而死的匿名队列。发布者和订阅者在同一个应用程序中。非常感谢任何帮助。

队列配置:

@Value("${apcp.rabbitmq.refresh-exchange}")
private String fanoutExchangeName;

@Autowired
Queue anonQueue; 
@Bean("amqp-admin")
@PostConstruct
public AmqpAdmin AMQPAdmin(){
    log.info(connectionFactory.toString());
    AmqpAdmin amqpAdmin = new RabbitAdmin(connectionFactory);
    return amqpAdmin;
}
@Bean
@PostConstruct
public String initRefreshAmqp(){
    setupFanOutExchange();
    return "";
}
public void setupFanOutExchange(){
    AmqpAdmin amqpAdmin =  new RabbitAdmin(connectionFactory);
    FanoutExchange exchange = new FanoutExchange(fanoutExchangeName);
    amqpAdmin.declareExchange(exchange);
    Queue queue = new Queue(anonQueue, false, true, true);
    amqpAdmin.declareQueue(queue);
    amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange));
}

出版商

@RequestMapping(value = "/publish")
public String publish(String message){
    rabbitTemplate.convertAndSend(exchangeName, message);
    return "";
}

订阅者配置

@Bean
@PostConstruct
public SimpleRabbitListenerContainerFactory listenerFactory() {
    log.info("CONNECTIONS:"+connectionFactory.toString());
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(jsonMessageConverter());
    return factory;
}

订阅者监听器

@RabbitListener(queues = "#{anonQueue.name}", containerFactory = "listenerFactory")
public void receiverQueue(String message){
    log.info(message);
}

【问题讨论】:

  • 每个听众有一个队列吗?对我来说,您需要的似乎更像是一种发布/订阅风格的通信。

标签: java spring spring-boot rabbitmq spring-rabbit


【解决方案1】:

1) 没有这样的方法:

rabbitTemplate.convertAndSend(exchangeName, message);

两个参数的方法是

public void convertAndSend(String routingKey, final Object object) throws AmqpException {

所以代理正在丢弃您的消息。

2) 你不能在 bean 定义中调用管理方法(或做任何涉及代理的事情)

3) 您的配置比需要的复杂得多。

这很好用...

@SpringBootApplication
public class So49854747Application {

    public static void main(String[] args) {
        SpringApplication.run(So49854747Application.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template, FanoutExchange exchange) {
        return args -> {
            template.convertAndSend(exchange.getName(), "", "foo");
            Thread.sleep(10_000);
        };
    }

    @Bean
    public Queue anonQueue() {
        return new AnonymousQueue();
    }

    @Bean
    public FanoutExchange exchange() {
        return new FanoutExchange("so49854747");
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(anonQueue()).to(exchange());
    }

    @RabbitListener(queues = "#{anonQueue.name}")
    public void listen(String in) {
        System.out.println(in);
    }

}

.

2018-04-16 09:01:54.620  INFO 50389 --- [           main] com.example.So49854747Application        : Started So49854747Application in 1.407 seconds (JVM running for 1.909)
foo

【讨论】:

  • 谢谢,这个解决方案确实有助于本地接收,但是当我将应用程序部署到 pcf 云时似乎无法创建匿名队列。我会在这部分工作,但非常感谢您的解决方案
猜你喜欢
  • 2020-07-26
  • 1970-01-01
  • 2013-07-05
  • 2022-08-19
  • 2016-08-21
  • 1970-01-01
  • 2021-06-14
  • 2013-06-13
  • 1970-01-01
相关资源
最近更新 更多