【问题标题】:Azure Node.js Running Service Bus Queue Messages in Worker RoleAzure Node.js 在辅助角色中运行服务总线队列消息
【发布时间】:2012-07-28 00:02:05
【问题描述】:

如何设置辅助角色以使用 windows azure sdk 侦听服务器总线队列消息?

目前我在 server.js 工作角色中有这个来监听队列消息

var http = require('http')
    , config = require('./config')
    , azure = require('azure')
    , uuid = require('node-uuid');

http.createServer(function (req, res) {
    processServices(function () {
        res.writeHead(200, { 'Content-Type': 'text/plain' });
        res.end('hello world');
    });
}).listen(process.env.port);

function processServices(callback) {
    var sb1 = azure.createServiceBusService(config.serviceBusNamespace, config.serviceBusAccessKey);
    sb1.receiveQueueMessage('startup', function (error, m) {
        if (!error) {
            writeMessage(JSON.stringify(m), function () {
                callback();
            });
        }
        else {
            writeMessage(JSON.stringify(error), function () {
                callback();
            });
        }
    });
}

function writeMessage(message, callback) {
    var serviceClient = azure.ServiceClient;
    var ts1 = azure.createTableService(serviceClient.DEVSTORE_STORAGE_ACCOUNT, serviceClient.DEVSTORE_STORAGE_ACCESS_KEY, serviceClient.DEVSTORE_TABLE_HOST);

    ts1.getTable('Messages', function (error) {
        if (error === null) {
            var messageEntity = {
                PartitionKey: '0',
                RowKey: uuid(),
                Message: message
            };

            ts1.insertEntity('Messages', messageEntity, function (error, newMessage) {
                callback();
            });
        }
        else callback();
    });
}

这在我的 server.js 网络角色中设置队列

var sb1 = azure.createServiceBusService(config.serviceBusNamespace, config.serviceBusAccessKey);
sb1.getQueue('startup', function (error, queue) {
    if (error) {
        sb1.createQueueIfNotExists('startup', function (error, queue) {
            if (!error)
                console.log("created startup queue 1: " + JSON.stringify(queue) + "\n");
            else
                console.log("don't got startup queue 1: " + JSON.stringify(error) + "\n");
        });
    }
    else console.log("created startup queue 2: " + JSON.stringify(queue) + "\n");
});
sb1.getQueue('serialnumbers', function (error, queue) {
    if (error) {
        sb1.createQueueIfNotExists('serialnumbers', function (error, queue) {
            if (!error)
                console.log("created serialnumbers queue 1: " + JSON.stringify(queue) + "\n");
            else
                console.log("don't got serialnumbers queue 1: " + JSON.stringify(error) + "\n");
        });
    }
    else console.log("created serialnumbers queue 2: " + JSON.stringify(queue) + "\n");
});

这在我的网络角色 index.js 文件中向队列发送消息

var azure = require('azure')
    , config = require('../utils/config');

exports.index = function (req, res) {
    var sb1 = azure.createServiceBusService(config.serviceBusNamespace, config.serviceBusAccessKey);
    var startupMessage = {
        body: ''
    };

    sb1.getQueue('startup', function (error, queue) {
        if (!error) {
            sb1.sendQueueMessage('startup', startupMessage, function (error) {
                if (!error) {
                    console.log("sent startup message 1\n");

                    res.render('index', {
                        locals: {
                            pageTitle: 'Home'
                        }
                    });
                }
                else {
                    console.log("didn't send startup message 1: " + JSON.stringify(error) + "\n");

                    res.render('index', {
                        locals: {
                            pageTitle: 'Home'
                        }
                    });
                }
            });
        }
        else {
            res.render('index', {
                locals: {
                    pageTitle: 'Home'
                }
            });
        }
    });
};

如何获取,以便在 web 角色运行 index.js 文件时,worker 角色进行侦听和执行?

目前它没有这样做,消息在队列中但没有被工作角色读取?

如何让工作角色从队列中读取消息?

我应该运行 cron 作业还是使用 socket.io?我有点困惑。

【问题讨论】:

    标签: node.js azure


    【解决方案1】:

    npm azure 包的问题,​​它通过下面的 HTTP 工作,你知道 HTTP 是请求/响应协议。 (不幸的是,在这个应用层没有像 SignalR 或类似的其他“推送”类型的通信)。所以唯一的方法是拉取消息 -((.

    但是,如果您使用 AMQP 协议,还有另一种消费(监听)消息的方式。

    有一个名为amqp10 的npm 包(基于promise,符合AMQP 1.0 的node.js 客户端),您可以在其中订阅消息并开始监听,类似于httpServer 或expressjs 的概念。在他们的 npmjs 包主页上,您可以找到文档和示例。

    简而言之,它看起来像这样: ..

    async function sample( {connectionString, topicSubscriptionPath} ){
       const { Client, Policy } = require('amqp10');
       const client = new Client(Policy.Utils.RenewOnSettle(1, 1, Policy.ServiceBusTopic));
       const connection = await client.connect(connectionString);
       const receiver = await connection.createReceiver(topicSubscriptionPath);
    
       receiver.on('message', (message) => {
             try {
               // here is your brokered message, do whatever u need to do
               receiver.accept(message); //if can be marked as processed, aka can be deleted from service bus
             } catch (error) {
               this.receiver.reject(message); //return back to service bus
             }
           });
       }
    sample(buildConnectionString(), 'subscriptionNameInYourTopic');
    
    function buildConnectionString(){
      //very specific format, keys you can find in your azure subscription
       return `amqps://${sharedAccessKeyName}:${sharedAccessKey}@${queueEndpoint}`;
    }
    }
    

    【讨论】:

      【解决方案2】:

      您只需要使用 .receiveQueueMessage 方法不断地轮询队列,直到收到来自队列的消息。投票之间的时间间隔完全由您决定。

      【讨论】:

      • 嗨 bbbonthemoon,根据我写的代码,你能给我一个例子来说明你的意思吗?欢呼
      • 嗯,问题是,服务总线队列客户端不会等待接收消息,当您使用 receiveQueueMessage() 进行轮询时,它只是检查是否有消息给您,如果是的话接收它,而不是接收它——它什么也没接收,然后离开。所以你需要一次又一次地调用这个方法,直到收到消息。在异步环境中创建无限循环是单独的主题,我通常用事件来做。我检查自定义事件的队列,并在我准备好再次轮询时触发事件。
      • 好的 bbbonthemoon 谢谢,我想了很多,最适合 node.js 的是 cron 模块,所以我会使用它。欢呼
      • 我认为这不准确。对receiveQueueMessage() 的调用将检查并等待,直到超时间隔过去,可以设置up to 24 days。长时间投票的成本要低得多,因为每次投票都会向您收费。 See here 了解更多详情。
      • 非常感谢乔尔!每 5 秒汇集一次,它工作得非常糟糕,直到很久以后才收到消息。现在它是一种享受!
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2013-11-09
      • 2014-11-03
      • 2017-11-01
      • 1970-01-01
      • 1970-01-01
      • 2013-02-28
      相关资源
      最近更新 更多