【问题标题】:Node Js / Typescript - AMQP ConsumerNode Js / Typescript - AMQP 消费者
【发布时间】:2022-02-16 11:35:22
【问题描述】:

我第一次尝试使用 node.js/typescript,但在为兔子队列创建消费者时遇到了一些麻烦。

代码:

let amqp = require('amqp');

let connection = amqp.createConnection({url: "amqp://" + RABBITMQ_USER + ":" + RABBITMQ_PASSWORD + "@" + RABBITMQ_HOST + ":" + RABBITMQ_PORT + RABBITMQ_VHOST});

connection.on('ready', function() {
    connection.exchange(RABBITMQ_WORKER_EXCHANGE, function (exchange) {
        connection.queue(RABBITMQ_QUEUE, function (queue) {
            queue.bind(exchange, function() {
                queue.publish(function (message) {
                    console.log('subscribed to queue');
                    let encoded_payload = unescape(message.data);
                    let payload = JSON.parse(encoded_payload);
                    console.log('Received a message:');
                    console.log(payload);
                })
            })
        })
    })
})

它似乎连接到 amqp 服务器并且没有抛出任何错误,但它只是坐在那里并且不消耗任何东西。有没有我遗漏的步骤?

任何帮助将不胜感激, 谢谢。

【问题讨论】:

    标签: node.js typescript amqp


    【解决方案1】:

    这是我基于 amqp 的 JS 教程的解决方案。 https://www.rabbitmq.com/tutorials/tutorial-three-javascript.html

    可能不符合 TypeScript 标准,如果有更好的方法,请随时纠正我。

    #!/usr/bin/env node
    
    require('dotenv').config();
    import amqp = require('amqplib/callback_api');
    import db = require('./database');
    
    amqp.connect({
        protocol: process.env.RABBITMQ_PROTOCOL,
        hostname: process.env.RABBITMQ_HOST,
        port: process.env.RABBITMQ_PORT,
        username: process.env.RABBITMQ_USER,
        password: process.env.RABBITMQ_PASSWORD,
        vhost: process.env.RABBITMQ_VHOST
    }, function(err, conn) {
        conn.createChannel(function (err, ch) {
            // set exchange that is being used
            ch.assertExchange(process.env.RABBITMQ_WORKER_EXCHANGE, 'direct', {durable: true});
            // set queue that is being used
            ch.assertQueue(process.env.RABBITMQ_QUEUE, {durable: true}, function (err, q) {
                console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);
                // bind the queue to the exchange
                ch.bindQueue(q.queue, process.env.RABBITMQ_WORKER_EXCHANGE, '');
                // consume from the queue, one message at a time.
                ch.consume(q.queue, function (msg) {
                    console.log("Message received: %s", msg.content.toString());
                    //save message to db
                    db.store(msg.content.toString()).then(function() {
                        //acknowledge receipt of message to amqp
                        console.log("Acknowledging message");
                        ch.ack(msg, true);
                    });
                }, {noAck: false});
            });
        });
    });
    

    【讨论】:

      【解决方案2】:
      import * as Amqp from "amqp-ts";
      
      
      var connection = new Amqp.Connection("amqp://localhost");
      var exchange = connection.declareExchange("ExchangeName");
      var queue = connection.declareQueue("QueueName");
      queue.bind(exchange);
      queue.activateConsumer((message) => {
          console.log("Message received: " + message.getContent());
      });
       
      // it is possible that the following message is not received because
      // it can be sent before the queue, binding or consumer exist
      var msg = new Amqp.Message("Test");
      exchange.send(msg);
       
      connection.completeConfiguration().then(() => {
          // the following message will be received because
          // everything you defined earlier for this connection now exists
          var msg2 = new Amqp.Message("Test2");
          exchange.send(msg2);
      });
      

      【讨论】:

        猜你喜欢
        • 2011-09-08
        • 2023-03-16
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-02-28
        相关资源
        最近更新 更多