【问题标题】:Spring RabbitListener invoke a method after a message is sentSpring RabbitListener 在发送消息后调用方法
【发布时间】:2020-02-17 11:48:41
【问题描述】:

在我的接收器处理特定数量的消息后,我需要停止 Rabbit 侦听器并调用一个方法,之后再次启用侦听器。

@RabbitListener(...)
public void sink(Message msg) {
    processMsg();

    if (condition) {
        rabbitListenerEndpointRegistry.stop();
        doTask();
        rabbitListenerEndpointRegistry.start();
    }
}

不幸的是,如果我在接收器方法中停止侦听器,则事务将失败并且消息将返回队列。我正在寻找一种在事务完成且侦听器未保存任何消息后调用方法的方法。

  1. 接收消息并处理消息
  2. 完成交易并释放消息我不想在此之后收到任何新消息
  3. 如果满足条件,则停止侦听器
  4. 执行长时间运行的任务
  5. 启动监听器

我无法转到手动事务管理,因为它需要对我的代码进行太多更改,并且在执行我的自定义任务时我无法保存任何消息,因为这是一项长时间运行的任务,我希望其他工作人员处理这段时间的消息。

Rabbit 工厂配置中的setAfterReceivePostProcessorssetAdviceChain 在我的情况下不起作用,因为在这两种情况下,当侦听器持有消息时都会调用该方法。

【问题讨论】:

  • 停止和启动容器的目的是什么?如果容器并发为 1,则在监听器退出之前,您不会收到任何新消息。
  • @GaryRussell 发布消息后我想运行的任务运行了很长时间,在此期间我不想持有或接收任何新消息。
  • 如果您将预取设置为 1,您将不会“保留或接收”任何新消息;但也许您根本不应该使用侦听器容器 - 请参阅我的答案。
  • @GaryRussell Prefetch 无济于事,因为我想完成事务-> 停止侦听器-> 运行任务-> 启动侦听器。我无法在事务结束时停止侦听器,因为消息将返回到队列中,这会给我带来其他问题。我已经看到了您的回答,但我更愿意在对侦听器进行任何重大更改之前找到解决方法。
  • 你只是在手头的任务中使用了错误的技术——你把事情复杂化了。我不能再帮忙了。

标签: java spring rabbitmq spring-amqp spring-rabbit


【解决方案1】:

消息侦听器容器似乎不适合您的应用程序;你需要一个“拉”模型。

您可以使用rabbitTemplate.receive()receiveAndConvert() 方法之一按需提取消息。

如果您在事务之外使用它们,消息将立即被确认;如果您希望在进程完成后确认消息,请在事务中运行它。

【讨论】:

    【解决方案2】:

    如果条件满足,您可以启动一个新线程,这将停止侦听器并执行某些任务并启动侦听器。虽然不能保证新线程会在原始监听器消费任何新消息之前停止监听器。但它可以大致工作。

    您可以拥有一个新队列,而不是手动启动线程,您可以在其中发布消息(例如启动/停止),并且新侦听器可以在将指令消息放入新队列(例如启动或停止)时停止原始侦听器.

    第二种方法:

    您可以尝试另一种我认为应该可行的方法,而不是将侦听器 setMaxConcurrentConsumers 停止为 0,这应该禁用侦听器,不允许它在不停止侦听器的情况下消耗更多消息。在完成您的任务后将其更改为原始值,我猜在您的情况下它是 1。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-04-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-08-23
      • 1970-01-01
      相关资源
      最近更新 更多