【发布时间】:2017-09-08 01:49:39
【问题描述】:
我使用 RabbitMQ 在 Node.js 中实现 RPC。
我按照教程,我为每个客户端声明相同的队列名称为'rpc_client'为了高权限,这里是client.js在服务器中调用函数:
const amqp = require('amqplib');
async function client(){
let args = process.argv.slice(2);
let corr = generateUuid();
let num = parseInt(args[0]);
try {
let conn = await amqp.connect('amqp://127.0.0.1');
let ch = await conn.createChannel();
let q = await ch.assertQueue('rpc_client');
console.log(' [x] Requesting fib(%d)', num);
console.log(q.queue)
await ch.consume(q.queue,msg=>{
if (msg.properties.correlationId == corr) {
console.log(' [.] Got %s', msg.content.toString());
ch.ack(msg)
setTimeout(function() { conn.close(); process.exit(0) }, 5500);
}
},{noAck:false});
ch.sendToQueue('rpc_server',
new Buffer(num.toString()),
{ correlationId: corr, replyTo: q.queue });
} catch(err){
console.error(err);
}
}
function generateUuid() {
return Math.random().toString() +
Math.random().toString() +
Math.random().toString();
}
client();
但我发现当我一次运行多个客户端时,在前一个客户端的连接关闭之前,后一个客户端不会运行消费回调(从服务器获取答案并打印它)。例如第二个客户端将得到答案并打印它,它的连接将在 5500 毫秒内关闭,第二个客户端必须等待第一个客户端关闭并打印答案,然后再等待 5500 毫秒关闭。
那么为什么会这样呢?因为队列可以同时消费两个worker中的两个按摩。
这里是server.js:
async function server(){
try {
let conn = await amqp.connect('amqp://127.0.0.1');
let ch = await conn.createChannel();
process.once('SIGINT',()=>conn.close());
let q = await ch.assertQueue('rpc_server');
ch.prefetch(1);
console.log(' [x] Awaiting RPC requests');
await ch.consume(q.queue,msg=>{
let n = parseInt(msg.content.toString());
console.log(" [.] fib(%d)", n);
let r = fibonacci(n);
ch.sendToQueue(msg.properties.replyTo,
new Buffer(r.toString()),
{correlationId: msg.properties.correlationId});
ch.ack(msg);
},{noAck:false});
} catch(err) {
console.error(err);
}
}
server();
function fibonacci (n , ac1 = 1 , ac2 = 1) {
if( n <= 1 ) {return ac2};
return fibonacci (n - 1, ac2, ac1 + ac2);
}
【问题讨论】: