【发布时间】:2017-05-06 03:39:28
【问题描述】:
我正在尝试使用 RabbitMQ 上的 Nodejs 模块 amqplib 来使用 blocked 和 unblocked 通道事件。我的理解是,如果系统资源达到“警报”状态,RabbitMQ 将向生产者发送连接阻塞命令。我的目的是利用它来确定生产者是否应该继续创建工作,或者以“稍后再试”作为回应。
amqplib 文档:http://www.squaremobius.net/amqp.node/channel_api.html#model_events
以下是我正在使用的软件版本:
- RabbitMQ 3.6.6,Erlang R16B03
- NodeJS 6.9.2
- amqplib ^0.5.1(节点模块)
- Ubuntu 14.04
我尝试过的事情
我的nodejs代码:
var amqp = require('amqplib');
amqp.connect('amqp://localhost').then((connection) => {
return connection.createChannel();
}).then((channel) => {
channel.on('blocked', (reason) => {
console.log(`Channel is blocked for ${reason}`);
});
channel.on('unblocked', () => {
console.log('Channel has been unblocked.');
});
return channel.assertQueue('myQueue', {
durable : true,
// This doesn't seem to do anything
arguments : {
"connection.blocked" : true
}
}).then(() => {
channel.sendToQueue('myQueue', new Buffer('some message'), {
persistent : true
});
});
});
我了解此特定功能是 AMQP 协议的扩展,需要启用/声明。我对 erlang 配置语法不是很熟悉。使用示例配置,我构建了一个如下所示的配置:
[{rabbit, [
{"server-properties", [
{"compatibilities", {
{ "connection.blocked", true }
}}
]}
根据此处的文档:https://www.rabbitmq.com/consumer-cancel.html#capabilities
“要接收这些通知,客户端必须出示 客户端属性中的功能表,其中有一个键 connection.blocked 和一个布尔值 true。查看功能 部分以获取更多详细信息。我们支持的客户表示 默认情况下此功能并提供一种注册处理程序的方法 connection.blocked 和 connection.unblocked 方法。”
然后使用service rabbitmq-server restart重新启动服务器。
这不会使服务器崩溃,但事件也不会触发。我期望在系统资源不足的情况下连接应该成为阻塞。 RabbitMQ 文档有一个关于Capabilities 的更多信息链接,但该链接已失效,我不确定还有什么可以尝试的。
【问题讨论】: