【问题标题】:How do I have RabbitMQ redeliver unacknowledged messages?如何让 RabbitMQ 重新传递未确认的消息?
【发布时间】:2018-03-21 03:21:56
【问题描述】:

我正在按照以下教程进行操作: https://www.rabbitmq.com/tutorials/tutorial-two-java.html.

我这样启动 RabbitMQ 服务器:

docker pull rabbitmq
docker run -d --hostname my-rabbit-host --name my-rabbit -p 5672:5672 rabbitmq:3

来自教程:

使用此代码,我们可以确定即使您使用 CTRL+C 在处理消息时,不会丢失任何内容。很快 工人死亡后,所有未确认的消息将被重新传递。

我产生了两个消费者,当我 CTRL+C 其中一个消费者时,另一个正在运行的消费者收到原本发给前消费者的消息。如何在 CTRL+C 退出其中一位消费者后重新传递消息?

编辑:我现在正在通过“brew”安装 RabbitMQ,但我仍然看到同样的问题。

brew update
brew install rabbitmq
/usr/local/sbin/rabbitmq-server &

【问题讨论】:

  • 根据您的描述,它应该可以开箱即用。您是否放置了睡眠而不是 doWork 的东西并在睡眠期间停止了消费者? (显然,一旦消息被“basicAcked”,它就不会再次传递给任何消费者)。您确定其他消费者实际上正在消费吗?
  • 正确 - 我在 ack'ing 之前睡觉了。我在睡觉时按CTRL+C。我在消费者的 handleDelivery() 回调中打印消息,但它永远不会通过。
  • 如果两个消费者都在消费,他们接收消息是均匀的,还是总是一样的接收消息?如果始终是同一个使用者,则可能表明第二个使用者未正确配置(例如,未正确绑定到队列)。为了更加自信,将一些消息放入队列并确保它们同时消费。另外,使用rabbitmq:3-management docker image(+ forward 15672 并在浏览器中打开它,用户名是guest,密码是guest)来找出队列中消息的实际情况。
  • 消费者以循环方式接收消息。出于某种原因,当我通过 Docker 安装时,mgmt 插件看不到任何活动连接或队列。我继续通过“brew”安装,现在 mgmt 插件页面看到了活动。但是,在缺少 CTRL+C 的确认后,消息仍然没有被重新传递。

标签: rabbitmq


【解决方案1】:

没有必要在消费者代码中放置睡眠或类似的东西。在您提供的链接上,搜索以 Manual message acknowledgments 开头的段落并查看那里的代码。关键是不要确认消息。如果你将 autoACK 标志设置为 true,那么你可以调用任何你想要的,消息一收到就被确认。所以简单地不要设置那个标志,而且为了测试你可以注释掉channel.basicAck(envelope.getDeliveryTag(), false);这行,以免手动确认。所以当消费者退出时,消息仍会在队列中。

【讨论】:

  • 嗯,好像还是不行。如果您想查看代码,请告诉我。
  • @Phillip 当然,请将您的代码添加到问题中
【解决方案2】:

奇怪,RabbitMQ 开箱即用。

第 1 步,我启动了 RabbitMQ:

$ docker run -d --hostname my-rabbit-host --name my-rabbit -p 5672:5672 rabbitmq:3
$ docker ps
CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS                                                   NAMES
e0c3257b8b49        rabbitmq:3          "docker-entrypoint.s…"   18 minutes ago      Up 14 minutes       4369/tcp, 5671/tcp, 25672/tcp, 0.0.0.0:5672->5672/tcp   my-rabbit

第 2 步,我发布了一条消息(顺便说一下,我尝试使用 Node.js。源代码请参见下面的附录):

$ node src/producer.js
Publisher:  TODO 1st

第三步,我一个接一个地启动了两个consumer(我的consumer是为了测试目的而设计的,不承认,所以RabbitMQ永远不会出队消息)。

消费者 1 会收到消息,而消费者 2 不会。

消费者1:

$ node src/consumer.js 
Consumer:  TODO 1st

消费者2:

$ node src/consumer.js 

第四步,当我通过 'Ctrl + c' 停止消费者 1 时,消费者 2 将立即收到来自 RabbitMQ 的消息:

消费者2:

$ node src/consumer.js 
Consumer:  TODO 1st

结论:基本上,在设置消费者时,我们需要告诉 RabbitMQ 在收到消费者的确认之前不要将消息出列。因此,如果消费者 1 在有机会确认消息之前因任何原因停止,RabbitMQ 会将消息重新传递给消费者 2。

附录

src/producer.js

var q = 'tasks'

function bail (err) {
  console.error(err)
  process.exit(1)
}

// Publisher
function publisher (conn) {
  conn.createChannel(onOpen)
  function onOpen (err, ch) {
    if (err != null) bail(err)
    ch.assertQueue(q)
    const msg = 'TODO 1st'
    ch.sendToQueue(q, Buffer.from(msg), { persistent: true })
    console.log('Publisher: ', msg)
  }
}

require('amqplib/callback_api')
  .connect('amqp://guest:guest@localhost', function (err, conn) {
    if (err != null) bail(err)
    publisher(conn)
  })

src/consumer.js

var q = 'tasks'

function bail (err) {
  console.error(err)
  process.exit(1)
}

// Consumer
function consumer (conn) {
  conn.createChannel(onOpen)
  function onOpen (err, ch) {
    if (err != null) bail(err)
    ch.assertQueue(q)
    ch.consume(q, function (msg) {
      if (msg !== null) {
        console.log('Consumer: ', msg.content.toString())
        // Commented out the line below, so RabbitMQ never dequeues a message
        // ch.ack(msg)
      }
    }, { noAck: false })
  }
}

require('amqplib/callback_api')
  .connect('amqp://guest:guest@localhost', function (err, conn) {
    if (err != null) bail(err)
    consumer(conn)
  })

【讨论】:

    猜你喜欢
    • 2016-05-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-10-02
    • 1970-01-01
    • 2015-11-24
    • 1970-01-01
    • 2016-04-07
    相关资源
    最近更新 更多