【问题标题】:Detect Exception on RabbitMq connection检测 RabbitMq 连接上的异常
【发布时间】:2021-06-14 20:24:57
【问题描述】:

使用 spring 集成将消息从 RabbitMQ 传输到 MQ 效果很好。

如果我停止 RabbitMQ 服务器,那么我们的日志文件就会出错:

ERROR o.s.a.r.c.CachingConnectionFactory - Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Node was put into maintenance mode, class-id=0, method-id=0)

我们如何拦截这个异常?

在 jms DefaultMessageListenerContainer 上添加 ExceptionListener 时效果很好

按照bean的配置:

<bean id="connectionAmqpFactorySrc" class="com.rabbitmq.client.ConnectionFactory">
    <property name="automaticRecoveryEnabled" value="true"/>
    <property name="networkRecoveryInterval" value="10000"/>
</bean>

<rabbit:connection-factory  id="rabbitConnectionFactory" connection-factory="connectionAmqpFactorySrc"
    username="guest" 
    password="guest" 
    addresses="XX.XX.XX.XX"
    cache-mode="CONNECTION" 
    virtual-host="/"  
    shuffle-addresses="true" />


<bean id="fixedBackOffRabbitMQ" class="org.springframework.util.backoff.FixedBackOff">
    <constructor-arg index="0" value="10000" />
    <constructor-arg index="1" value="3" />
</bean>
    
<bean id="myListener" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="rabbitConnectionFactory" />
    <property name="queueNames" value="MyQueue" />
    <property name="recoveryBackOff" ref="fixedBackOffRabbitMQ"/>
    <property name="channelTransacted" value="true"></property>
    <property name="errorHandler" ref="errorHandler"></property>
</bean>
    
<int-amqp:inbound-channel-adapter   channel="channelRmqMQ" 
        id="inboundChannelAdapter" 
        auto-startup="true" listener-container="myListener" error-channel="processChannel1" />

EDIT1

按照你的建议,我使用了这样的 bean 定义:

<bean id="listeners" class="java.util.ArrayList">
    <constructor-arg>
        <list>
            <ref bean="connectionAmqpListener" />
        </list>
    </constructor-arg>
</bean>

<bean id="rabbitConnectionFactory"  class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="connectionAmqpFactorySrc"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
    <property name="addresses" value="XX.XX.XX.XX"/>
    <property name="cacheMode" value="CONNECTION"/>
    <property name="virtualHost" value="/"/>
    <property name="shuffleAddresses" value="true"/>
    <property name="connectionListeners" ref="listeners"/>
</bean>

ConnectionAmqpListener.java

public class ConnectionAmqpListener implements ConnectionListener {
    
    private final Log LOG = LogFactory.getLog(ConnectionAmqpListener.class);
    
    public ConnectionAmqpListener() {
        super();
    }
    
    public void onCreate(Connection connection) {
        System.out.println("Open connection");
    }
    
    public void onClose(Connection connection) {
        System.out.println("Connection is closed");
    }
    
    public void onShutDown(ShutdownSignalException signal) {
        System.out.println("Connection is shutdown");
        System.exit(-1);
    }
}   

这很好,当我停止代理时,方法onShutDown 被调用。

但是如果我重新启动我的进程(代理关闭),我在日志文件中没有任何消息并且进程停止。

如果连接失败,您对如何获取信息有什么建议吗?

结束编辑1

感谢您的帮助

问候,

埃里克

【问题讨论】:

    标签: spring-integration spring-rabbit spring-integration-amqp


    【解决方案1】:

    ConnectionFactory:

    void addConnectionListener(ConnectionListener listener)
    

    那个回调有这个钩子:

    /**
     * Called when a connection is force closed.
     * @param signal the shut down signal.
     * @since 2.0
     */
    default void onShutDown(ShutdownSignalException signal) {
    }
    

    【讨论】:

    • 你不能。将&lt;bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"/&gt; 与适当的属性设置一起使用,包括提到的connectionListeners 或切换到Java & Annotation 配置。
    • 抱歉,我删除了我之前的评论,因为我找到了方法。添加这个块我捕获了异常。 &lt;bean class="org.springframework.beans.factory.config.MethodInvokingFactoryBean"&gt; &lt;property name="targetObject" ref="rabbitConnectionFactory" /&gt; &lt;property name="targetMethod" value="addConnectionListener" /&gt; &lt;property name="arguments" ref="connectionAmqpListener" /&gt; &lt;/bean&gt; 但是当我在 RabbitMQ 上重新启动我的进程时使用这种语法我没有得到任何错误?你认为我需要使用你的方法吗?还是添加其他监听器?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-07-04
    • 2012-10-16
    • 1970-01-01
    • 2020-05-26
    相关资源
    最近更新 更多