【问题标题】:NodeJS and RabbitMQ, how to be sure my message is processedNodeJS 和 RabbitMQ,如何确保我的消息得到处理
【发布时间】:2018-01-21 15:09:49
【问题描述】:

我正在构建一种微服务应用程序并使用 RabbitMQ 在我的服务之间进行通信。

我有一个 nodeJS 应用程序,它应该从 RabbitMQ 接收消息并在特定消息进入时执行命令。所以下面是以下代码的作用:

  1. 连接到 RabbitMQ
  2. 收听symfony_messages队列
  3. 如果收到由product.created 标识的消息,脚本将使用来自child_processspawn 执行特定命令。

我的问题是:有时,我会“重新启动”我的脚本。我怎么能确定在重新启动脚本的那一刻不是在处理事件的过程中?如何确定进程不会在生成进程之前消耗消息并停止?

我想到的可能的解决方案是:

  • 向 nodeJS 进程发送一个信号,告诉他“处理最后一条消息并停止”。 但我怎样才能发出这样的信号

这里是代码(如果您已经收到问题,则无需阅读):

const amqp = require('amqplib/callback_api')
const { spawn } = require('child_process')

amqp.connect('amqp://guest:guest@127.0.0.1:5672', (err, conn) => {

    if (err) {
        console.log(err)
        return
    }

    conn.createChannel((err, channel) => {
        let q = 'symfony_messages'

        channel.assertQueue(q, {
            durable: false
        })

        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);

        channel.consume(q, (msg) => {
            let event = JSON.parse(msg.content.toString())

            if (event.name === 'product.created') {
                console.log('Indexing order...')

                let cp = spawn('php', [path.join(__dirname, '..', '..', 'bin', 'console'), 'elastic:index:orders', event.payload.product_id])

                cp.stdout.on('data', (data) => {
                    console.log(`stdout: ${data}`);
                })

                cp.stderr.on('data', (data) => {
                    console.log(`stderr: ${data}`);
                })

                cp.on('close', (code) => {
                    console.log(`child process exited with code ${code}`);
                })
            }

        }, {noAck: true});
    })
})

【问题讨论】:

    标签: node.js rabbitmq


    【解决方案1】:

    成功处理消息后,在消息上使用 channel.ack(message) 函数不是一个很好的模式吗?您已将 noAck 选项设置为 true,但您可以使用 ACK 机制来确保消息仅在成功处理后才从队列中取出。

    同样,你可以使用 Nack 函数故意告诉 RabbitMQ 消息没有被处理,我通常在 process 函数错误处理程序(或 promise.catch)中这样做。

    我在将消息写入数据库的服务中使用了类似的机制。我只在消息写入数据库后才确认消息。在 RabbitMQ 中设置死信交换/队列也很有用,这样任何 Nacked 的消息都会在那里结束。然后,您可以检查这些消息并查看它们无法处理的原因(或在导致问题的错误条件得到解决后自动尝试重新处理。)

    【讨论】:

    • 太好了,我刚刚阅读了有关 Ack 和 Nack 功能的文档,它完全符合我的需求,谢谢!
    • 酷,还检查了死信功能,捕获无法处理的奇怪消息非常有用。你 nack 一条消息,然后它最终进入死信队列。您的流程可以继续进行,您可以更新算法以稍后处理这些消息。
    • 你能给我们举个例子吗?当消费者发送 Nack 时,我无法在 Publisher 端处理它...
    猜你喜欢
    • 2015-12-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-10-02
    • 2016-11-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多