【问题标题】:Spring amqp consumer not re-connecting to queue after network failure网络故障后Spring amqp消费者未重新连接到队列
【发布时间】:2019-04-27 10:09:53
【问题描述】:

我们有一个 spring 应用程序,它有一个动态队列侦听器连接到 rabbitmq 中的队列。假设我总共有 5 个侦听器使用者连接到从我的 spring 应用程序到 rabbitmq 的 5 个队列。

现在,如果每次发生网络波动/故障,我的 5 个连接队列中的第一个将停止重试 rabbitmq。

我通过 spring-amqp 类调试了代码,发现在创建与 rabbitmq 的连接时(发生网络故障时),它无法连接到它并抛出 org.springframework.amqp.AmqpIOException 未处理的特定异常在重试函数中,以便从重试队列列表中删除该队列。

我的主要课程:

@Slf4j
@SpringBootApplication(exclude = {ClientAutoConfiguration.class})
@EnableTransactionManagement
@EnableJpaRepositories(basePackages = "com.x.x.repositories")
@EntityScan(basePackages = "com.x.x.entities")
public class Main
{
    @PostConstruct
    void configuration()
    {
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
    }

    /**
     * The main method.
     *
     * @param args the arguments
     */
    public static void main(String[] args)
    {
        ConfigurableApplicationContext context = SpringApplication.run(Main.class, args);

        RabbitMQListenerUtil queueRegisterUtil = context.getBean(RabbitMQListenerUtil.class);

        try
        {
            queueRegisterUtil.registerSpecifiedListenerForAllInstance();
        }
        catch (Exception e)
        {
            log.error(e.getMessage(), e);
        }
    }
}

用于创建 5 个消费者/侦听器的类


/**
 * The Class RabbitMQListenerUtil.
 */
@Component
@Slf4j
public class RabbitMQListenerUtil
{
    @Autowired
    private ApplicationContext applicationContext;


    public void registerSpecifiedListenerForAllInstance()
    {
        try
        {

            log.debug("New Listener has been register for instane name : ");
            Thread.sleep(5000);
            registerNewListener("temp1");
            registerNewListener("temp2");
            registerNewListener("temp3");
            registerNewListener("temp4");
            registerNewListener("temp5");
        }
        catch (Exception e)
        {

        }
    }

    /**
     * This method will add new listener bean for given queue name at runtime
     * 
     * @param queueName - Queue name
     * @return Configurable application context
     */
    public void registerNewListener(String queueName)
    {
        AnnotationConfigApplicationContext childAnnotaionConfigContext = new AnnotationConfigApplicationContext();
        childAnnotaionConfigContext.setParent(applicationContext);

        ConfigurableEnvironment environmentConfig = childAnnotaionConfigContext.getEnvironment();

        Properties listenerProperties = new Properties();
        listenerProperties.setProperty("queue.name", queueName + "_queue");
        PropertiesPropertySource pps = new PropertiesPropertySource("props", listenerProperties);
        environmentConfig.getPropertySources().addLast(pps);

        childAnnotaionConfigContext.register(RabbitMQListenerConfig.class);
        childAnnotaionConfigContext.refresh();
    }

}

为队列消费者创建动态监听器的类

/**
 * The Class RabbitMQListenerConfig.
 */
@Configuration
@Slf4j
@EnableRabbit
public class RabbitMQListenerConfig
{

    /** The Constant ALLOW_MESSAGE_REQUEUE. */
    private static final boolean ALLOW_MESSAGE_REQUEUE = true;

    /** The Constant MULTIPLE_MESSAGE_FALSE. */
    private static final boolean MULTIPLE_MESSAGE_FALSE = false;

    /**
     * Listen.
     *
     * @param msg the msg
     * @param channel the channel
     * @param queue the queue
     * @param deliveryTag the delivery tag
     * @throws IOException Signals that an I/O exception has occurred.
     */
    @RabbitListener(queues = "${queue.name}")
    public void listen(Message msg, Channel channel, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException
    {
        int msgExecutionStatus = 0;
        try
        {
            String message = new String(msg.getBody(), StandardCharsets.UTF_8);
            log.info(message);
        }
        catch (Exception e)
        {
            log.error(e.toString());
            log.error(e.getMessage(), e);
        }
        finally
        {
            ackMessage(channel, deliveryTag, msgExecutionStatus);
        }
    }

    /**
     * Ack message.
     *
     * @param channel the channel
     * @param deliveryTag the delivery tag
     * @param msgExecutionStatus the msg execution status
     * @throws IOException Signals that an I/O exception has occurred.
     */
    protected void ackMessage(Channel channel, long deliveryTag, int msgExecutionStatus) throws IOException
    {
        if (msgExecutionStatus == Constants.MESSAGE_DELETE_FOUND_EXCEPTION)
        {
            channel.basicNack(deliveryTag, MULTIPLE_MESSAGE_FALSE, ALLOW_MESSAGE_REQUEUE);
        }
        else
        {
            channel.basicAck(deliveryTag, MULTIPLE_MESSAGE_FALSE);
        }
    }

    /**
     * Bean will create from this with given name.
     *
     * @param name - Queue name- 
     * @return the queue
     */
    @Bean
    public Queue queue(@Value("${queue.name}") String name)
    {
        return new Queue(name);
    }

    /**
     * RabbitAdmin Instance will be created which is required to create new Queue.
     *
     * @param cf - Connection factory
     * @return the rabbit admin
     */
    @Bean
    public RabbitAdmin admin(ConnectionFactory cf)
    {
        return new RabbitAdmin(cf);
    }

}

应用日志:

https://pastebin.com/NQWdmdTH

我已经对此进行了多次测试,每次我的第一个连接队列都被停止连接。

==========================更新1============= ================

重新连接已停止消费者的代码: https://pastebin.com/VnUrhdLP

【问题讨论】:

    标签: java spring rabbitmq spring-amqp


    【解决方案1】:

    Caused by: java.net.UnknownHostException: rabbitmqaind1.hqdev.india

    您的网络有问题。

    【讨论】:

    • 是的,我提到在此期间会发生网络波动。所以会抛出这种类型的错误。它只是网络丢失了一段时间,它又回来了。但是为什么我们的消费者被阻止重新连接呢?
    • 另一个有不同异常的实例,如下所示。 pastebin.com/wsTRXapH 在上述情况下,它停止重试连接。
    • Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - home node 'rabbit@MICROSVC1A1' of durable queue 'prodqueue.1535968874507_queue' in vhost '/' is down or inaccessible, class-id=50, method-id=10) 您正在访问一个非镜像队列并且拥有该队列的节点已关闭。默认情况下,我们在尝试 3 次后停止尝试;你可以增加容器的declarationRetries属性和failedDeclarationRetryInterval(默认5000ms)。
    • 我们所有的队列都是镜像队列,这个队列和其他50多个队列都在那边,除了这个队列,其他队列也连接成功。仍然需要 declarationRetries 和 failedDeclarationRetryInterval 属性吗?我们也在使用 DirectMessageListenerContainer。
    • 否;您不需要使用直接容器的declarationRetries,它将永远尝试。好吧,错误很明显is down or inaccessible 很明显,您连接的节点无法访问队列 - 也许您有一个需要修复的裂脑集群。如果没有,您应该询问 rabbitmq-users Google 组中的 RabbitMQ 人员是什么导致了这种情况。
    猜你喜欢
    • 2015-03-14
    • 1970-01-01
    • 2012-01-29
    • 2019-08-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-27
    • 2021-06-16
    相关资源
    最近更新 更多