【问题标题】:Making spring amqp consumer to stop consuming messages for a specified type of exception使 spring amqp 消费者停止消费指定类型异常的消息
【发布时间】:2020-10-25 01:21:01
【问题描述】:

我正在使用rabbitmq使用spring amqp。

我有一个用例,我的消费者依赖另一个系统 - X 可能会停机。我想要的是通过以下方式处理当 X 关闭时出现的异常 - 在 XDownException 上,我想停止处理来自队列的消息,这样我就不会在停机期间丢失这些消息并继续重新排队消息,直到我停止收到 XDownException。这样,我确信当 X 关闭时我不会丢失任何消息,然后在 X 启动时自动恢复。

Fifo 是必须的。侦听器在处理消息时抛出 XDownException。监听器现在不知道事务,但如果有帮助,我们可以让它知道事务。 但是我不想为每一种例外都这样做...... 有没有办法用 spring amqp 做到这一点?

另外,还有比这种方法更好的方法吗? X 出现时我没有活动。

【问题讨论】:

  • 问题 1:假设您正在使用消息驱动的 POJO 和 spring 侦听器容器,您是否期望在侦听器中处理接收到的消息期间出现 XDownException(以及可能的其他异常)?问题 2:您的侦听器交易感知吗?问题 3:FIFO(换句话说,保证顺序交付)对您来说是必须的吗?您的用例的解决方案将取决于这些问题的答案。请使用其他输入更新您的问题,以便其他人提供帮助。
  • 用这些细节更新了问题

标签: java spring rabbitmq spring-amqp spring-rabbit


【解决方案1】:

FIFO 要求规定您的容器设置中不能有超过一个并发消费者。假设此设置,您将在 POJO 方法中一一收到每条消息。除非完全处理此消息,否则不会传递下一条消息。

以下是您描述的用例在这种情况下的策略应该是什么。

  1. 确保在容器上将预取大小设置为 1 和手动确认模式。完整详情请访问Asynchronous Consumer
  2. 您的设置应如下所示
public class ExtendedListenerAdapter extends MessageListenerAdapter {
    @Override
    protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
        return new Object[]{extractedMessage, channel, message};
    }
}


public class MyListener {
    public void handleMessage(Object object, Channel channel, Message message) throws IOException {
        try {
            processMessage(Object)
        } catch (XDownException xdex) {
            waitForXAvailability(object);
        } catch (OtherException oex) {
        } finally {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
    
    private processMessage(Object object) /* throws All types of exceptions */ {
        // process the message as usual
    }
    
    private waitForXAvailability(Object object) {
        for (;;) {
            // add delay here, exponetial backoff delay recommend with upper bound
            // also add upper bounds on number of iteration you want to keep, it's infinte now
            try {
                processMessage(Object);
                return; // no exception means X is up again
            } catch (XDownException xdex) {
                // x is down, let the loop continue
            }
        }
    }
}

@Configuration
public class ExampleAmqpConfiguration {

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setQueueName("some.queue");
        container.setPrefetchCount(1)
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL)
        container.setMessageListener(myAdapter());
        return container;
    }

    @Bean
    public ConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory =
            new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public ExtendedListenerAdapter myAdapter() {
        ExtendedListenerAdapter adapter = new ExtendedListenerAdapter();
        listenerAdapter.setDelegate(myListener())
        return adapter;
    }
    
    @Bean
    public MyListener myListener() {
        return new MyListener();
    }
}

请调整以上大纲以最适合您的需求。以下链接应该为您提供更多有用的信息

  1. Spring RabbitMQ - using manual channel acknowledgement
  2. Spring AMQP - Receiving Messages

【讨论】:

  • 感谢您的回答...我看到我们正在重试进程消息,直到 X 再次备份。添加延迟有助于优化重试次数。我对答案投了赞成票,但我只是在想是否有更好的方法来解决这个问题。
  • 有更好的方法,但严格的 FIFO 要求使这里的情况变得复杂。由于没有明确通知 X 可用性,因此重试是识别 X 是否可用的唯一选项,这使情况变得更加复杂。如果有一个事件通知 X 再次启动,那么 pausing and resuming listener 也是一个选项。
猜你喜欢
  • 2017-09-23
  • 1970-01-01
  • 2022-10-23
  • 2019-04-16
  • 1970-01-01
  • 2020-08-11
  • 1970-01-01
  • 2016-12-27
  • 2015-11-25
相关资源
最近更新 更多