【问题标题】:How to read messages destructively from RabbitMq queue using SimpleMessageListenerContainer bean如何使用 SimpleMessageListenerContainer bean 从 RabbitMq 队列中破坏性地读取消息
【发布时间】:2016-10-22 11:16:44
【问题描述】:

我需要帮助来实现 SimpleMessageListenerContainer,它会在消息接收者读取后立即以非事务方式删除消息。

在我的情况下,无论事务成功或不成功,放入队列的消息都会挂在某处(不在队列中),并在从队列读取的每个操作中重复处理。因此,放入队列的所有其他消息仍然无法访问,并且每次仅重新处理第一个消息。

另一个奇怪的事情是,我看不到队列中的消息登陆/排队,即在 Rabbit 管理控制台上队列深度永远不会改变,而只是消息速率在每次写入队列时都会发生跳跃。

下面是我的 Java 配置的代码 sn-p。如果有人可以在这里指出错误,那将有很大帮助:-

@Configuration
@EnableJpaRepositories(basePackages={"com.xxx.muppets"})
public class MuppetMessageConsumerConfig {

private ApplicationContext applicationContext;
@Value("${rabbit.queue.name}")
private String queueName;

@Autowired
public void setApplicationContext(ApplicationContext applicationContext) {
    this.applicationContext = applicationContext;
}

@Bean
Queue queue() {
    return new Queue(queueName, false);
}

@Bean
TopicExchange exchange() {
    return new TopicExchange("spring-boot-exchange");
}

@Bean
Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(queueName);
}

@Bean
MuppetMsgReceiver muppetMsgReceiver(){
    return new MuppetMsgReceiver();
}

@Bean
MessageListenerAdapter listenerAdapter(MuppetMsgReceiver muppetMsgReceiver){
    return new MessageListenerAdapter(muppetMsgReceiver, "receiveMessage");
}

@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setAcknowledgeMode(NONE);
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(queueName);
    container.setMessageListener(listenerAdapter);
    return container;
}
}

我的消息接收类如下:

public class MuppetMsgReceiver {

private String muppetMessage;

private CountDownLatch countDownLatch;

public MuppetMsgReceiver() {
    this.countDownLatch = new CountDownLatch(1);
}

public MuppetMsgReceiver(CountDownLatch latch) {
    this.countDownLatch = latch;        
    CountDownLatch getCountDownLatch() {
    return countDownLatch;
}

public void receiveMessage(String receivedMessage) {
    countDownLatch.countDown();
    this.muppetMessage = receivedMessage;
}

public String getMuppetMessage() {
    return muppetMessage;
}
}

此完整代码基于 Spring 的 getting started example,但由于队列中的非破坏性读取而没有帮助。

【问题讨论】:

    标签: rabbitmq spring-amqp spring-rabbit


    【解决方案1】:

    AcknowledgeMode=NONE 表示代理在消息发送 给消费者后立即确认消息,而不是在消费者收到消息时确认,因此您观察到的行为对我来说毫无意义。使用这种确认模式是很不寻常的,但是消息一发送就会消失。这可能是您的代码有问题。 TRACE 日志记录将帮助您了解正在发生的事情。

    【讨论】:

    • 我完全同意 Gary,这就是将 Ack Mode 设置为 None 的全部原因。我认为了解 SimpleMessageListenerContainer 的工作原理可能对我有用。我曾期望它仅在需要时才从队列中读取消息,但看起来接收者创建的 bean 会在消息发布到 Broker 后立即使用该消息。如果这种理解是正确的,您能否帮助我?如果是,所有已读消息保存/存储在哪里?那个储藏室在我看来是没有冲洗过的地方。
    • 它们不是“存储的”;它们被写入BlockingQueueConsumer 中的队列(其大小基于prefetchCount,以避免消费者跟不上时出现OOM 情况)。消费者线程删除每条消息并调用侦听器 bean;在这种情况下,不可能重新发送消息。
    • BlockingQueueConsumer 是由 RabbitMq 管理的,在这种情况下,我仍然不确定为什么我的应用程序会出现异常行为并且每次只能获取第一条消息。我仍然会用我的测试用例来看看和锻炼它。如果您知道这种(错误)行为的任何可能原因,请告诉我。
    • 没有。它在 SMLC 内部。正如我所说,打开 TRACE 调试。
    • 我碰巧深入研究了这个问题。这真的是我误解和实现消息接收器的方式。我希望仅在需要时才从队列接收消息,但是侦听器容器的实现不满足需求,因此出现了问题。现在通过解决方法解决了这个问题。然而,目前我遇到了另一个困惑,即消息是按 LIFO 顺序读取的,而不是队列中预期的 FIFO。是否可以配置任何方式,以便我仅在 FIFO 中接收消息?
    猜你喜欢
    • 1970-01-01
    • 2016-09-02
    • 2017-09-16
    • 1970-01-01
    • 2014-08-24
    • 1970-01-01
    • 1970-01-01
    • 2012-11-05
    • 2013-12-18
    相关资源
    最近更新 更多