【发布时间】:2018-01-21 15:09:49
【问题描述】:
我正在构建一种微服务应用程序并使用 RabbitMQ 在我的服务之间进行通信。
我有一个 nodeJS 应用程序,它应该从 RabbitMQ 接收消息并在特定消息进入时执行命令。所以下面是以下代码的作用:
- 连接到 RabbitMQ
- 收听
symfony_messages队列 - 如果收到由
product.created标识的消息,脚本将使用来自child_process的spawn执行特定命令。
我的问题是:有时,我会“重新启动”我的脚本。我怎么能确定在重新启动脚本的那一刻不是在处理事件的过程中?如何确定进程不会在生成进程之前消耗消息并停止?
我想到的可能的解决方案是:
- 向 nodeJS 进程发送一个信号,告诉他“处理最后一条消息并停止”。 但我怎样才能发出这样的信号?
这里是代码(如果您已经收到问题,则无需阅读):
const amqp = require('amqplib/callback_api')
const { spawn } = require('child_process')
amqp.connect('amqp://guest:guest@127.0.0.1:5672', (err, conn) => {
if (err) {
console.log(err)
return
}
conn.createChannel((err, channel) => {
let q = 'symfony_messages'
channel.assertQueue(q, {
durable: false
})
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
channel.consume(q, (msg) => {
let event = JSON.parse(msg.content.toString())
if (event.name === 'product.created') {
console.log('Indexing order...')
let cp = spawn('php', [path.join(__dirname, '..', '..', 'bin', 'console'), 'elastic:index:orders', event.payload.product_id])
cp.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
})
cp.stderr.on('data', (data) => {
console.log(`stderr: ${data}`);
})
cp.on('close', (code) => {
console.log(`child process exited with code ${code}`);
})
}
}, {noAck: true});
})
})
【问题讨论】: