【问题标题】:Javascript Promises/Q - Am I Doing This Right?Javascript Promises/Q - 我这样做对吗?
【发布时间】:2014-06-28 00:04:15
【问题描述】:

我正在尝试在 NodeJS 中使用 Q 来创建一些 amqplib 包装器。包装器工作正常(到目前为止),但我觉得我对 Q 的使用......不正确。

首先,有一个初始化方法:

private static Startup(): void {
    var sub_YoMsgHandler = (msg: any) => {
        console.log(util.format('Received Yo: %s', msg.content.toString()));
        var bmy: dto.interfaces.IBusManifestYo = JSON.parse(msg.content.toString());
    }

    var sub_TelemetryMsgHandler = (msg: any) => {
        var bmy: dto.interfaces.IAsyncProcessorCommand = JSON.parse(msg.content.toString());
        console.log(util.format('Received Telemetry: %s', msg.content.toString()));
    }

    Play.AMQ.Open.then((connection) => {
        Play.AMQ.ConfirmChannel = connection.createConfirmChannel();
        Play.AMQ.ConfirmChannel.then((confirmChannel) => {
            confirmChannel.on('error', Play.handleChannelError);

            Play.AMQ.CommandQueue = confirmChannel.assertQueue('AsyncProcessorCommandQueue', { durable: true, exclusive: false, autoDelete: false });
            Play.AMQ.TelemetryQueue = confirmChannel.assertQueue('AsyncProcessorTelemetryQueue', { durable: true, exclusive: false, autoDelete: false });
            Play.ReceiveTelemetry(sub_TelemetryMsgHandler);

            Play.CreateConsumer('Node', dto.BusManifestYo.Type, sub_YoMsgHandler).then((consumerTag) => {
                //track consumer tags in CreateConsumer?
                Play.AMQ.Subscribers.push(consumerTag);
            });
        });
    });
}

我有没有提到我正在使用 TypeScript?该方法是连接、创建通道、创建两个发送/接收队列、创建两个订阅者——然后将连接、通道和队列承诺保存到一个对象中。那么这里是创建订阅者(消费者)的一种方法:

private static CreateConsumer(name: string, type: string, handler: (msg: any) => void): Q.Promise<string> {
    var qid = type + '_' + name;

    return Play.AMQ.ConfirmChannel.then((confirmChannel) => {
        return confirmChannel.assertQueue(qid, { durable: true, exclusive: false, autoDelete: false }).then((okQueueReply) => {
            return confirmChannel.assertExchange(type, 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
                return confirmChannel.bindQueue(qid, type, '').then((okBindReply) => {
                    return confirmChannel.consume(qid, (msg) => {
                        handler(msg);
                        confirmChannel.ack(msg);
                    });
                });
            });
        });
    },
    (failReason) => {
        throw new Error('create consumer issue: ' + failReason);
    });
}

最后,这是我的发布方法:

 private static Publish(obj: dto.interfaces.IPublishable): Q.Promise<boolean> {
    var ackDeferred = Q.defer<boolean>();

    var handleChannelConfirm = (err, ok): void => {
        if (err !== null) {
            console.warn('Message nacked!');
            ackDeferred.resolve(false);
        }
        else {
            console.log('Message acked');
            ackDeferred.resolve(true);
        }
    }

    // '#' instead of ''?
    Play.AMQ.ConfirmChannel.then((confirmChannel) => {
        confirmChannel.assertExchange(obj.GetType(), 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
            confirmChannel.publish(obj.GetType(), '', Play.ToBuffer(obj), { type: obj.GetType() }, handleChannelConfirm);
        });
    },
    (failReason) => {
        throw new Error('create consumer issue: ' + failReason);
    });

    return ackDeferred.promise;
}

正如我所说,所有工作都有效,但感觉就像我没有以正确或推荐的方式使用承诺。

这里有什么明显的失误吗?或者我做得对吗?具体来说,我想,我对我的链接和错误处理很好奇(我认为错误处理特别有可能是错误的)。向我展示了在发布方法中采用回调样式处理程序并对其进行承诺的正确方法的奖励积分...

【问题讨论】:

    标签: node.js typescript promise amqp q


    【解决方案1】:

    如果 Q 遵循承诺规范,它应该像那样工作

    return Play.AMQ.ConfirmChannel
    .then(confirmChannel => confirmChannel.assertQueue(qid, { durable: true, exclusive: false, autoDelete: false }))
    .then(okQueueReply => confirmChannel.assertExchange(type, 'topic', { durable: true, autoDelete: false }))
    .then(okExchangeReply => confirmChannel.bindQueue(qid, type, ''))
    .then(okBindReply => confirmChannel.consume(qid))
    .then(msg => {
        handler(msg);
        confirmChannel.ack(msg);
    });
    

    从最后一个返回的东西也很有意义。

    then 方法中的回调返回一个新的 Promise,因此您可以在不嵌套回调的情况下链接它们。

    【讨论】:

    • 补充一点,.then中的实现回调将promise的解包值(最终结果)作为参数,如果你返回一个,它的返回值要么是直接值,要么是如果您返回一个承诺,则返回的承诺的最终值。就像你说的,它还返回一个新值的新承诺。
    • 好答案。我还要提一下,如果您想为thens 中的 任何 步骤处理错误,可以使用最终的 .catch
    • 良好的反馈 - 我需要多花一点时间,但我可能会接受它。我没有意识到第一个then 中的confirmChannel 之类的值将在随后的thens 范围内。 catch 也有好处。我知道我需要这样的东西,但究竟是什么在逃避我。
    【解决方案2】:

    总结你的想法,

    /*
    Open() // connecting...
        .then confirmChannel()  //create a channel
            .then function () {
                commandQueue(); //create send queue?
                TelemetryQueue(); // create recv queue?
            }
                .then createConsumer(); //create two subsribers?
    
    
    .catch(function (err) {
        The most outter error handler.
    })
    .end() terminate the promise chain
    */
    

    我会一一介绍功能

    Play.AMQ.Open.then((connection) => {
    
            Play.AMQ.ConfirmChannel = connection.createConfirmChannel();
            return Play.AMQ.ConfirmChannel.then((confirmChannel) => {
            //since ConfirmChannel() is a promise, it means channel is working I assume if you can run this callback
            //confirmChannel.on('error', Play.handleChannelError); 
    
                //No idea following codes are promise or not...
            //Assuming that following 3 lines won't be fail
                    Play.AMQ.CommandQueue = confirmChannel.assertQueue('AsyncProcessorCommandQueue', { durable: true, exclusive: false, autoDelete: false });
                    Play.AMQ.TelemetryQueue = confirmChannel.assertQueue('AsyncProcessorTelemetryQueue', { durable: true, exclusive: false, autoDelete: false });
                    Play.ReceiveTelemetry(sub_TelemetryMsgHandler);
    
            //Chainning promise....
            //Btw... I am curious why you call a `prviate` method directly
                    return Play.CreateConsumer('Node', dto.BusManifestYo.Type, sub_YoMsgHandler).then((consumerTag) => {
                        //track consumer tags in CreateConsumer?
                        Play.AMQ.Subscribers.push(consumerTag);
                    });
            });
    
    })
    .end(); //Catch any un-handled errors and terminate the Q chain (EG error from confirmChannel())
    
    
    
    private static CreateConsumer(name: string, type: string, handler: (msg: any) => void): Q.Promise<string> {
        var qid = type + '_' + name;
    
        //I suggest you put `confirmChannel` as one of the input arguments since this method will only be called after open() and confirmChannel()
        return Play.AMQ.ConfirmChannel.then((confirmChannel) => {
            return confirmChannel.assertQueue(qid, { durable: true, exclusive: false, autoDelete: false }).then((okQueueReply) => {
                return confirmChannel.assertExchange(type, 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
                    return confirmChannel.bindQueue(qid, type, '').then((okBindReply) => {
                        return confirmChannel.consume(qid, (msg) => {
                            handler(msg);
                            confirmChannel.ack(msg);
                        });
                    });
                });
            });
        }
        //I am not familiarize syntax on TypeScript, I think below error handler is an input argument of Play.AMQ.ConfirmChannel.then()
        //If so, this error handler can handler error from ConfirmChannel() only.
        ,
        (failReason) => {
        throw new Error('create consumer issue: ' + failReason);
        });
    
    }
    
    
    
    /*
    I am not sure the intent (or command flow?) of your below function.
    
    1. If ConfirmChannel() fail, I think your program will crash since no one handling below error
        throw new Error('create consumer issue: ' + failReason);
    
    2. No idea to figure out the relationship between 2 promises ackDeferred and Play.AMQ.ConfirmChannel
    **/
    private static Publish(obj: dto.interfaces.IPublishable): Q.Promise<boolean> {
        var ackDeferred = Q.defer<boolean>();
    
        var handleChannelConfirm = (err, ok): void => {
        if (err !== null) {
            console.warn('Message nacked!');
            ackDeferred.resolve(false);
        }
        else {
            console.log('Message acked');
            ackDeferred.resolve(true);
        }
        }
    
        // '#' instead of ''?
        Play.AMQ.ConfirmChannel.then((confirmChannel) => {
            confirmChannel.assertExchange(obj.GetType(), 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
            confirmChannel.publish(obj.GetType(), '', Play.ToBuffer(obj), { type: obj.GetType() }, handleChannelConfirm);
        });
        },
        (failReason) => {
        throw new Error('create consumer issue: ' + failReason);
        });
    
        return ackDeferred.promise;
    }
    

    希望对你有帮助。

    顺便说一句....如何开启语法高亮:D?

    【讨论】:

    • 提交时默认开启。它默认为标记语言,但您可以强制使用它。另外,感谢您的努力,但这是一个比另一个更糟糕的答案,所以我不赞成它,以便让另一个答案更具可见性。
    • 您能给我一些提示,以便我下次做得更好吗?
    • 当然,看看其他答案。
    • 另一个答案提供了与第二个功能块等效的代码,但呈现更好。当然,如果我们写在这个优雅的解决方案中,可以避免像ghost promisebroken of chain 这样的问题。我的回答解决了我在源代码中看到的这些问题。第一个功能块有断链,而第三个功能块可能存在幽灵承诺。此外,可能是第二个功能块上放错位置的错误处理程序。
    猜你喜欢
    • 2012-05-27
    • 2012-08-13
    • 2023-03-19
    • 1970-01-01
    • 2015-01-13
    • 1970-01-01
    • 1970-01-01
    • 2015-06-09
    • 1970-01-01
    相关资源
    最近更新 更多