【问题标题】:How to fetch messages from an Azure Service Bus Queue in "PeekLock" mode using AMQP?如何使用 AMQP 在“PeekLock”模式下从 Azure 服务总线队列中获取消息?
【发布时间】:2017-09-07 17:59:56
【问题描述】:

我们正在尝试在 Node 应用程序中使用 Azure 服务总线。 我们的要求是从一个队列中获取多条消息

由于 Azure SDK for Node 不支持批量检索,我们决定使用 AMQP。虽然我们能够使用此处所述的 Peek Messages 获取消息 (https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-request-response#message-operations)。

我们注意到,一旦获取消息,它们就会从队列中删除。我想知道是否有人深入了解我们如何使用 AMQP 和 Node.js 在“PeekLock”模式下获取消息。对于 AMQP,我们使用 amqp10 节点包 (https://www.npmjs.com/package/amqp10)。

这是我们查看消息的代码:

const AMQPClient = require('amqp10/lib').Client,
Policy = require('amqp10/lib').Policy;

const protocol = 'amqps';
const keyName = 'RootManageSharedAccessKey';
const sasKey = 'My Shared Access Key'
const serviceBusHost = 'account-name.servicebus.windows.net';
const uri = protocol + '://' + encodeURIComponent(keyName) + ':' + encodeURIComponent(sasKey) + '@' + serviceBusHost;
const queueName = 'test1';
var client = new AMQPClient(Policy.ServiceBusQueue);
client.connect(uri)
.then(function () {
    return Promise.all([
        client.createReceiver(queueName),
        client.createSender(queueName)
    ]);
})
.spread(function(receiver, sender) {
    console.log(receiver);
    console.log(sender);
    console.log('--------------------------------------------------------------------------');
    receiver.on('errorReceived', function(err) {
        // check for errors
        console.log(err);
    });
    receiver.on('message', function(message) {
        console.log('Received message');
        console.log(message);
        console.log('------------------------------------');
    });

    return sender.send([], {
        operation: 'com.microsoft:peek-message',
        'message-count': 5
    });
})
.error(function (e) {
    console.warn('connection error: ', e);
});

【问题讨论】:

  • 谢谢阿拉文德。该代码从队列中获取一条消息。这在 Azure SDK 中也可用。我们正在寻找的是从队列中获取多条消息。
  • 在创建队列时,您必须将 EnableBatchedOperations 设置为 true。但同样在队列客户端,您需要设置预取计数。不确定是否在 nodejs sdk 中启用了该属性。或者您可以创建多个接收器并尝试。
  • 你能找到解决办法吗?
  • @Dhiren 通过下面的回答,我们能够获取消息,但无法解锁和更新这些消息的锁定。

标签: node.js azure amqp azure-servicebus-queues


【解决方案1】:

默认接收器工作在auto-settle模式,你必须将其更改为settle on disposition

const { Constants } = require('amqp10')

// 
// ...create client, connect, etc...
//

// Second parameter of createReceiver method enables overwriting policy parameters
const receiver = client.createReceiver(queueName, {
  attach: {
    rcvSettleMode: Constants.receiverSettleMode.settleOnDisposition
  }
})

处理后不要忘记接受/拒绝/释放消息:

receiver.on('message', msg => {
  //
  // ...do something smart with a message...
  //

  receiver.accept(msg) // <- manually settle a message
})

【讨论】:

  • 感谢@qzb。使用 rcvSettleMode 创建一个接收器链接作为 solveOnDisposition 有帮助。现在消息没有被删除,但我无法获取与消息关联的锁定令牌。
  • 这有点令人困惑。显然,为了接受/拒绝消息,AMQP 客户端必须使用delivery-tag。在 amp10 库的情况下,delivery-tag 存储在 msg._deliveryId 字段中。但它看起来不像一个lock-token,它看起来像一个序列号。
  • 是的,但是要更新和解锁消息,我们需要 lock-token。如果我们不能得到 lock-token,就不能使用 amqp 来接收消息。
  • AFAIK, receiver.release(msg) 解锁消息。不知道能不能续订。
  • 因为我们的要求是获取多条消息,这在 node.js SDK 中是不可能的。因此,我们尝试使用 amqp 获取消息,然后使用 node.js SDK 解锁和更新 msg 锁。但由于我们没有获得锁定令牌,我们不能使用 amqp。
猜你喜欢
  • 2017-04-22
  • 1970-01-01
  • 2021-07-26
  • 1970-01-01
  • 1970-01-01
  • 2023-04-06
  • 2016-03-15
  • 2019-06-22
  • 2021-10-31
相关资源
最近更新 更多