【发布时间】:2020-02-17 11:48:41
【问题描述】:
在我的接收器处理特定数量的消息后,我需要停止 Rabbit 侦听器并调用一个方法,之后再次启用侦听器。
@RabbitListener(...)
public void sink(Message msg) {
processMsg();
if (condition) {
rabbitListenerEndpointRegistry.stop();
doTask();
rabbitListenerEndpointRegistry.start();
}
}
不幸的是,如果我在接收器方法中停止侦听器,则事务将失败并且消息将返回队列。我正在寻找一种在事务完成且侦听器未保存任何消息后调用方法的方法。
- 接收消息并处理消息
- 完成交易并释放消息我不想在此之后收到任何新消息
- 如果满足条件,则停止侦听器
- 执行长时间运行的任务
- 启动监听器
我无法转到手动事务管理,因为它需要对我的代码进行太多更改,并且在执行我的自定义任务时我无法保存任何消息,因为这是一项长时间运行的任务,我希望其他工作人员处理这段时间的消息。
Rabbit 工厂配置中的setAfterReceivePostProcessors 和setAdviceChain 在我的情况下不起作用,因为在这两种情况下,当侦听器持有消息时都会调用该方法。
【问题讨论】:
-
停止和启动容器的目的是什么?如果容器并发为 1,则在监听器退出之前,您不会收到任何新消息。
-
@GaryRussell 发布消息后我想运行的任务运行了很长时间,在此期间我不想持有或接收任何新消息。
-
如果您将预取设置为 1,您将不会“保留或接收”任何新消息;但也许您根本不应该使用侦听器容器 - 请参阅我的答案。
-
@GaryRussell Prefetch 无济于事,因为我想完成事务-> 停止侦听器-> 运行任务-> 启动侦听器。我无法在事务结束时停止侦听器,因为消息将返回到队列中,这会给我带来其他问题。我已经看到了您的回答,但我更愿意在对侦听器进行任何重大更改之前找到解决方法。
-
你只是在手头的任务中使用了错误的技术——你把事情复杂化了。我不能再帮忙了。
标签: java spring rabbitmq spring-amqp spring-rabbit