【问题标题】:AMQP 1.0 Temporary Queues in Node.js with rheaNode.js 中的 AMQP 1.0 临时队列与 rhea
【发布时间】:2019-03-23 08:56:51
【问题描述】:

我使用 RabbitMQ 已经有一段时间了。我有几个微服务使用与Rabbit tutorials 中描述的非常接近的基本 RPC 机制运行。我正在尝试使用 rhea 切换到 AMQP 1.0,因为我需要使用 Amazon MQ。但是,我仍然坚持复制这个简单的模式:

ch.assertQueue('', {exclusive: true}, function(err, q) {
 let corr = //some UUID
   ch.consume(q.queue, function(msg) {
     /* */
   });

ch.sendToQueue('rpc_queue',
      "TEST2",
      { correlationId: corr, replyTo: q.queue });
    });
})

我没有从 rhea 得到的是可能有临时队列(与客户端连接相关),然后将“replyTo”回复到这些队列。

我试过了:

client.open_receiver({
    source: { address: "rpc:callback", expiry_policy: "connection-close" }
  });

使用expiry_policy,但它不起作用。我正在尝试使用 AMQP 1.0 插件然后使用 Apache ActiveMQ 的 RabbitMQ。

重点是,我想...

  1. 有一个临时(独占)队列,当客户端连接断开时会自动断开。
  2. 使用该临时队列(我可以手动为其分配临时名称,这不是重点)来处理回复。

但是,我既无法获得临时队列(AMQP 0.9.1 中独有),也无法使用该名称来处理回复。

【问题讨论】:

  • 嗨,你解决了吗?可以分享一下吗?
  • @vrachlin 很久以前我做了我刚刚在下面发布的内容

标签: node.js rabbitmq activemq rpc amqp


【解决方案1】:

本质上归结为这个流程。从下往上阅读,因为这是事物的逻辑流程。您首先启动您的侦听器,该侦听器将创建具有随机名称的动态队列。然后,您从传入的context 打开您的发件人和回复队列的名称。

// 'conn' is the rhea connection you have already created

new Promise((resolve, reject) => {
    let replyToQueue;

    conn.on('message', (context) => {
        // you have received your message
        resolve(context.message);
    });

    conn.once('sendable', (context) => {
        // send a message with a reply_to header
        context.sender.send({
            reply_to: replyToQueue,
            body: 'some message content'
        });
    });

    conn.on('receiver_open', (context) => {
        // capture the name of that dynamically named queue here
        replyToQueue = context.receiver.source.address;
        conn.open_sender('queue://send.to');
    });

    // start listening to a dynamically named temporary queue
    conn.open_receiver({ source: { dynamic: true } });
}

【讨论】:

    【解决方案2】:
    const container = require("rhea");
    const _logger = require("pino")();
    const nanoid = require("nanoid");
    
    const init = ({ config, caller, resources, services, rpcs }) => {
      return new Promise((resolve, reject) => {
        let _rpcs = {};
        let _responses = {};
    
        const send = (sender, receiver, correlation_id, body) => {
          if (receiver.source.address) {
            sender.send({
              reply_to: receiver.source.address,
              correlation_id,
              body
            });
          }
        };
    
        container.on("connection_open", context => {
          //RPCS
          rpcs &&
            rpcs.forEach(sendTo => {
              let parts = sendTo.name.split(".");
              _rpcs[parts[0]] = _rpcs[parts[0]] ? _rpcs[parts[0]] : {};
    
              let sender = context.connection.open_sender(sendTo.name);
              let receiver = context.connection.open_receiver({
                source: { dynamic: true }
              });
    
              receiver.on("message", context => {
                let correlation_id = context.message.correlation_id;
                if (_responses[correlation_id]) {
                  let { resolve, reject } = _responses[correlation_id];
                  resolve(context.message.body);
                  delete _responses[correlation_id];
                }
              });
    
              _rpcs[parts[0]][parts[1]] = body =>
                new Promise((resolve, reject) => {
                  const correlation_id = nanoid();
                  _responses[correlation_id] = { resolve, reject };
                  send(sender, receiver, correlation_id, body);
                });
            });
    
          // SERVICES
          services &&
            services.forEach(service => {
              let receiver = context.connection.open_receiver({
                source: `${resources.name}.${service.name}`,
                //credit_window: 1, //service.prefetch || 500,
                autoaccept: false
              });
    
              receiver.on("message", async context => {
                let request = context.message;
                let reply_to = request.reply_to;
                let payload = request.body;
    
                try {
                  let response = {
                    to: reply_to,
                    body: await caller(service.responder)({ payload })
                  };
                  if (request.correlation_id) {
                    response.correlation_id = request.correlation_id;
                  }
                  context.connection.send(response);
                  context.delivery.accept();
                } catch (error) {
                  _logger.error(error);
                  context.delivery.reject();
                }
              });
            });
        });
    
        container.on("receiver_open", context => {
          resolve(_rpcs);
        });
    
        container.on("connection_error", error => _logger.error(error));
    
        container.connect(config.getResource("amqp"));
      });
    };
    
    module.exports = { init };
    
    

    【讨论】:

      猜你喜欢
      • 2012-09-12
      • 2020-05-19
      • 1970-01-01
      • 2020-06-14
      • 2011-05-01
      • 2011-06-09
      • 2019-06-18
      • 2018-08-08
      • 2012-09-23
      相关资源
      最近更新 更多