【发布时间】:2014-06-28 00:04:15
【问题描述】:
我正在尝试在 NodeJS 中使用 Q 来创建一些 amqplib 包装器。包装器工作正常(到目前为止),但我觉得我对 Q 的使用......不正确。
首先,有一个初始化方法:
private static Startup(): void {
var sub_YoMsgHandler = (msg: any) => {
console.log(util.format('Received Yo: %s', msg.content.toString()));
var bmy: dto.interfaces.IBusManifestYo = JSON.parse(msg.content.toString());
}
var sub_TelemetryMsgHandler = (msg: any) => {
var bmy: dto.interfaces.IAsyncProcessorCommand = JSON.parse(msg.content.toString());
console.log(util.format('Received Telemetry: %s', msg.content.toString()));
}
Play.AMQ.Open.then((connection) => {
Play.AMQ.ConfirmChannel = connection.createConfirmChannel();
Play.AMQ.ConfirmChannel.then((confirmChannel) => {
confirmChannel.on('error', Play.handleChannelError);
Play.AMQ.CommandQueue = confirmChannel.assertQueue('AsyncProcessorCommandQueue', { durable: true, exclusive: false, autoDelete: false });
Play.AMQ.TelemetryQueue = confirmChannel.assertQueue('AsyncProcessorTelemetryQueue', { durable: true, exclusive: false, autoDelete: false });
Play.ReceiveTelemetry(sub_TelemetryMsgHandler);
Play.CreateConsumer('Node', dto.BusManifestYo.Type, sub_YoMsgHandler).then((consumerTag) => {
//track consumer tags in CreateConsumer?
Play.AMQ.Subscribers.push(consumerTag);
});
});
});
}
我有没有提到我正在使用 TypeScript?该方法是连接、创建通道、创建两个发送/接收队列、创建两个订阅者——然后将连接、通道和队列承诺保存到一个对象中。那么这里是创建订阅者(消费者)的一种方法:
private static CreateConsumer(name: string, type: string, handler: (msg: any) => void): Q.Promise<string> {
var qid = type + '_' + name;
return Play.AMQ.ConfirmChannel.then((confirmChannel) => {
return confirmChannel.assertQueue(qid, { durable: true, exclusive: false, autoDelete: false }).then((okQueueReply) => {
return confirmChannel.assertExchange(type, 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
return confirmChannel.bindQueue(qid, type, '').then((okBindReply) => {
return confirmChannel.consume(qid, (msg) => {
handler(msg);
confirmChannel.ack(msg);
});
});
});
});
},
(failReason) => {
throw new Error('create consumer issue: ' + failReason);
});
}
最后,这是我的发布方法:
private static Publish(obj: dto.interfaces.IPublishable): Q.Promise<boolean> {
var ackDeferred = Q.defer<boolean>();
var handleChannelConfirm = (err, ok): void => {
if (err !== null) {
console.warn('Message nacked!');
ackDeferred.resolve(false);
}
else {
console.log('Message acked');
ackDeferred.resolve(true);
}
}
// '#' instead of ''?
Play.AMQ.ConfirmChannel.then((confirmChannel) => {
confirmChannel.assertExchange(obj.GetType(), 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
confirmChannel.publish(obj.GetType(), '', Play.ToBuffer(obj), { type: obj.GetType() }, handleChannelConfirm);
});
},
(failReason) => {
throw new Error('create consumer issue: ' + failReason);
});
return ackDeferred.promise;
}
正如我所说,所有工作都有效,但感觉就像我没有以正确或推荐的方式使用承诺。
这里有什么明显的失误吗?或者我做得对吗?具体来说,我想,我对我的链接和错误处理很好奇(我认为错误处理特别有可能是错误的)。向我展示了在发布方法中采用回调样式处理程序并对其进行承诺的正确方法的奖励积分...
【问题讨论】:
标签: node.js typescript promise amqp q