【问题标题】:Spring-amqp -The last message in queue remain unacknowledged until I close the serverSpring-amqp - 队列中的最后一条消息保持未确认,直到我关闭服务器
【发布时间】:2016-11-27 16:21:53
【问题描述】:

我是 spring-amqp 的新手。我正在尝试手动确认消息而不是使用自动确认。

我看到管理控制台中的最后一条消息未被确认。

image for unacked message in managemnet console. 但是队列是空的。

一旦我停止服务器,最后一条消息就会得到确认。我该如何处理以及如何在日志中打印未被确认的消息 ID/信息。

这是我已经实现的代码。

RabbitConfig.java:

公共类 RabbitMQConfig {

final static String queueName = "spring-boot";

@Bean
Queue queue() {
    return new Queue(queueName, true,false,false,null);
}

@Bean
TopicExchange exchange() {
    return new TopicExchange("spring-boot-exchange");
}

@Bean
Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(queueName);
}

@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                         MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(queueName);
    container.setMessageListener(listenerAdapter);
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return container;
}

@Bean
Consumer receiver() {
    return new Consumer();
}

@Bean
MessageListenerAdapter listenerAdapter(Consumer receiver) {
    return new MessageListenerAdapter(receiver, "receiveMessage");
}

Consumer.java

公共类 Consumer 实现 ChannelAwareMessageListener{

@RabbitListener(queues = "spring-boot")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
        throws IOException, InterruptedException {
    Thread.sleep(500);
    channel.basicAck(tag, true);
    System.out.println(tag + "received");
}

@Override
public void onMessage(Message arg0, Channel arg1) throws Exception {
    // TODO Auto-generated method stub

}

生产者端点:

@RestController 公共类 HelloController {

private final RabbitTemplate rabbitTemplate;

public HelloController(RabbitTemplate rabbitTemplate) {
    this.rabbitTemplate = rabbitTemplate;

}

// Call this end point from the postman or the browser then check in the
// rabbitmq server
@GetMapping(path = "/hello")
public String sayHello() throws InterruptedException {
    // Producer operation
    for (int i = 0; i < 100; i++) {
        Thread.sleep(500);
        rabbitTemplate.convertAndSend(RabbitMQConfig.queueName, "Hello World");
    }
    return "hello";
}

@GetMapping(path = "/hellotwo")
public String sayHellotwo() throws InterruptedException {
    // Producer operation
    for (int i = 0; i < 50; i++) {
        Thread.sleep(500);
        rabbitTemplate.convertAndSend(RabbitMQConfig.queueName, "SEcond message");

    }
    return "hellotwo";
}

【问题讨论】:

    标签: rabbitmq spring-amqp


    【解决方案1】:

    你有两个监听器容器; container bean 和一个由框架为 @RabbitListener 创建的。

    如果不自己运行测试,我不完全确定会发生什么,但我怀疑问题是您尝试从简单的 MessageListenerAdapter 调用 receiveMessage

    该适配器仅设计用于调用具有一个参数的方法(从Message 转换而来)。此外,该适配器不知道如何映射 @Header 参数。我怀疑交付失败,并且由于您使用的是手动确认,因此由于未确认的交付和默认的 qos (1),不再尝试向该容器交付。

    你不需要你的container bean;而是配置消息侦听器容器工厂以设置 ack 模式。见the documentation

    如果您是 spring-amqp 的新手;为什么你认为你需要手动确认?默认模式(自动)意味着容器将为您确认/确认(NONE 是传统的兔子自动确认)。在 Spring 中使用手动确认并不常见。

    【讨论】:

    • 谢谢加里。该解决方案对我有用。我正在使用手动确认来确保即使消费者因任何原因死亡并且消息被重新排队,消息也不会丢失。
    • 那是不必要的 - AUTO ack 模式不是本机 rabbitmq 自动确认(即 NONE)。使用AUTO,只有监听器正常退出,容器才会确认消息;如果抛出异常,它将取消消息;如果消费者死亡,代理将自动重新排队消息。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-10
    • 2017-01-17
    • 1970-01-01
    • 2017-01-23
    • 1970-01-01
    • 2012-05-07
    相关资源
    最近更新 更多