【问题标题】:RabbitMQ RPC using Node.js使用 Node.js 的 RabbitMQ RPC
【发布时间】:2017-09-08 01:49:39
【问题描述】:

我使用 RabbitMQ 在 Node.js 中实现 RPC。

我按照教程,我为每个客户端声明相同的队列名称为'rpc_client'为了高权限,这里是client.js在服务器中调用函数:

const amqp = require('amqplib');

async function client(){
    let args = process.argv.slice(2);
    let corr = generateUuid();
    let num = parseInt(args[0]);
    try {
        let conn = await amqp.connect('amqp://127.0.0.1');
        let ch = await conn.createChannel();
        let q = await ch.assertQueue('rpc_client');

        console.log(' [x] Requesting fib(%d)', num);
        console.log(q.queue)

        await ch.consume(q.queue,msg=>{
            if (msg.properties.correlationId == corr) {
                console.log(' [.] Got %s', msg.content.toString());
                ch.ack(msg)
                setTimeout(function() { conn.close(); process.exit(0) }, 5500);
            }
        },{noAck:false});

        ch.sendToQueue('rpc_server',
        new Buffer(num.toString()),
        { correlationId: corr, replyTo: q.queue });

    } catch(err){
        console.error(err);
    }
}

function generateUuid() {
    return Math.random().toString() +
           Math.random().toString() +
           Math.random().toString();
}

client();

但我发现当我一次运行多个客户端时,在前一个客户端的连接关闭之前,后一个客户端不会运行消费回调(从服务器获取答案并打印它)。例如第二个客户端将得到答案并打印它,它的连接将在 5500 毫秒内关闭,第二个客户端必须等待第一个客户端关闭并打印答案,然后再等待 5500 毫秒关闭。

那么为什么会这样呢?因为队列可以同时消费两个worker中的两个按摩。

这里是server.js

async function server(){
    try {
        let conn = await amqp.connect('amqp://127.0.0.1');
        let ch = await conn.createChannel();
        process.once('SIGINT',()=>conn.close());

        let q = await ch.assertQueue('rpc_server');
        ch.prefetch(1);
        console.log(' [x] Awaiting RPC requests');

        await ch.consume(q.queue,msg=>{
            let n = parseInt(msg.content.toString());

            console.log(" [.] fib(%d)", n);

            let r = fibonacci(n);

            ch.sendToQueue(msg.properties.replyTo,
            new Buffer(r.toString()),
            {correlationId: msg.properties.correlationId});

            ch.ack(msg);
        },{noAck:false});


    } catch(err) {
        console.error(err);
    }
}

server();

function fibonacci (n , ac1 = 1 , ac2 = 1) {
    if( n <= 1 ) {return ac2};

    return fibonacci (n - 1, ac2, ac1 + ac2);
}

【问题讨论】:

    标签: node.js rabbitmq rpc


    【解决方案1】:

    如果队列名称相同,那么它就是同一个队列。不是两个同名的队列。在这种情况下,客户端按顺序而不是并行获取消息是有意义的。

    所以尝试使用不同的队列名称,它应该可以工作。

    【讨论】:

    • 但是为什么同一个队列的服务器可以并行运行呢?参考rabbitmq.com/tutorials/tutorial-two-javascript.html
    • 多个消费者订阅同一个队列。在这种情况下,他们将收到备用消息。就像奇数的会成为第一个消费者,然后偶数会成为第二个。此处提供有关此模式的更多详细信息:rabbitmq.com/tutorials/tutorial-two-javascript.html
    • 对了,所以我的问题,有两个客户端,他们调用服务器中的函数,然后服务器会产生两个消息回rpc_client队列,所以两个客户端可以处理他们并行。但他们没有,为什么?
    • @laoqiren 我也有同样的问题。你找到解决办法了吗?
    【解决方案2】:

    据我所知,您必须将预取更改为大于 1 的数字。

    basic.qos (prefetch) 方法允许你限制未确认的数量 消费时通道(或连接)上的消息。

    所以问题出在这个参数上。您的频道将等待您的第一个请求得到处理和确认。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2015-04-15
      • 1970-01-01
      • 1970-01-01
      • 2016-01-21
      • 1970-01-01
      • 2020-11-15
      • 1970-01-01
      相关资源
      最近更新 更多