【问题标题】:How to use ConnectionListner and/or ChannelListner for logging failure/success of message delivery in RabbitMQ如何使用 ConnectionListner 和/或 ChannelListner 在 RabbitMQ 中记录消息传递的失败/成功
【发布时间】:2021-04-01 21:06:44
【问题描述】:

我正在尝试记录在 RabbitMQ 中发送消息期间发生的任何信息或异常,因为我尝试在现有连接工厂上添加 ConnectionListener。

    kRabbitTemplate.getConnectionFactory().addConnectionListener(new ConnectionListener() {

        @Override
        public void onCreate(Connection connection) {
            System.out.println("Connection Created");
        }

        @Override
        public void onShutDown(ShutdownSignalException signal) {
            System.out.println("Connection Shutdown "+signal.getMessage());
        }

    });
    kRabbitTemplate.convertAndSend(exchange, routingkey, empDTO);       
    

为了测试异常场景,我从 RabbitMQ 控制台取消绑定甚至删除了队列。但我没有得到任何异常或任何关闭方法调用。

虽然,当我停止 RabbitMQ 服务时,我得到了

Exception in thread "Thread-5" org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect

但是这个异常不是来自我添加的监听器。

我想知道

  1. 为什么我没有收到任何异常或来自关机方法的调用
  2. 如何使用 ConnectionListner 和/或 ChannelListner 记录消息传递的失败/成功。
  3. 我们可以使用 AMQP 附加程序吗?如果可以,我们该怎么做? (任何示例/教程)
  4. 还有哪些其他方法可以确保发送消息?

注意:我不想使用发布者确认的方法。

【问题讨论】:

    标签: rabbitmq spring-amqp spring-rabbit


    【解决方案1】:

    Connection Refused 不是ShutdownSignalException - 从未建立连接,因为服务器/端口上不存在代理。

    您不能使用侦听器来确认单个消息的传递或返回;使用发布者确认并返回。

    https://docs.spring.io/spring-amqp/docs/current/reference/html/#publishing-is-async

    有关如何使用附加程序,请参阅文档。

    https://docs.spring.io/spring-amqp/docs/current/reference/html/#logging

    编辑

    要获得连接失败的通知,您目前需要使用其他技术,具体取决于您是发送还是接收。

    这里有一个例子说明如何:

    @SpringBootApplication
    public class So66882099Application {
    
        private static final Logger log = LoggerFactory.getLogger(So66882099Application.class);
    
        public static void main(String[] args) {
            SpringApplication.run(So66882099Application.class, args);
        }
    
        @RabbitListener(queues = "foo")
        void listen(String in) {
    
        }
    
        // consumer side listeners for no connection
    
        @EventListener
        void consumerFailed(ListenerContainerConsumerFailedEvent event) {
            log.error(event + " via event listener");
            if (event.getThrowable() instanceof AmqpConnectException) {
                log.error("Broker down?");
            }
        }
    
        // or
    
        @Bean
        ApplicationListener<ListenerContainerConsumerFailedEvent> eventListener() {
            return event -> log.error(event + " via application listener");
        }
    
        // producer side - use a RetryListener
    
        @Bean
        RabbitTemplate template(ConnectionFactory cf) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(cf);
            RetryTemplate retry = new RetryTemplate();
            // configure retries here as needed
            retry.registerListener(new RetryListener() {
    
                @Override
                public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
                    return true;
                }
    
                @Override
                public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
                        Throwable throwable) {
    
                    log.error("Send failed " + throwable.getMessage());
                }
    
                @Override
                public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
                        Throwable throwable) {
                }
    
            });
            rabbitTemplate.setRetryTemplate(retry);
            return rabbitTemplate;
        }
    
    
        @Bean
        public ApplicationRunner runner(RabbitTemplate template) {
            return args -> {
                try {
                    template.convertAndSend("foo", "bar");
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            };
        }
    
    }
    

    【讨论】:

    • 在添加 ConnectionListner/ChannelListner 时,我们会在创建连接/通道时收到 onCreate 调用,我们还在 ConnectionListner/ChannelListner 的 onShutDown 方法中添加了日志记录,以检查连接/通道何时关闭。所以通过停止rabbitMQ服务,我们试图模拟关闭RabbitMQ连接/通道的场景,但是没有成功,请您告诉我们这个onShutDown方法将在什么场景下被调用以及我们如何模拟这些场景?
    • 我正在尝试利用 ConnectionListener 和 ChannelListener 的现有功能来记录与连接/通道关闭相关的任何失败场景。您能否详细说明这些课程中对此的支持?
    • 没有什么好阐述的;连接监听器目前都是关于已建立的连接;它在连接打开时调用,并且在该连接正常关闭(或收到关闭信号)时再次调用。如果从未有过连接,则不涉及。以前没有人要求过这个;我将打开一个新功能问题;同时,我将编辑我的答案以展示如何在此用例中使用其他侦听器。
    • 我添加了一个示例来展示如何获取连接失败通知;我还开了一个新功能请求github.com/spring-projects/spring-amqp/issues/1315
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-12-13
    • 1970-01-01
    • 1970-01-01
    • 2017-11-24
    • 2014-02-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多