【问题标题】:NodeJs delay each promise within Promise.all()NodeJs 在 Promise.all() 中延迟每个承诺
【发布时间】:2018-05-03 04:13:39
【问题描述】:

我正在尝试更新一个不久前创建的使用 nodejs 的工具(我不是 JS 开发人员,所以我正在尝试将代码拼凑在一起)并且卡在最后一个障碍。

新功能将采用 swagger .json 定义,使用适用于 JS 的 'aws-sdk' 开发工具包将端点与 AWS 服务上匹配的 API 网关进行比较,然后相应地更新网关。

代码在一个小的定义文件(大约 15 个端点)上运行良好,但是一旦我给它一个更大的文件,我就开始收到大量的 TooManyRequestsException 错误。

我了解这是因为我对 API Gateway 服务的调用太快,需要延迟/暂停。这就是我卡住的地方

我已尝试添加;

  • 每个返回的承诺的延迟()
  • 在每个承诺中运行 setTimeout()
  • 为 Promise.all 和 Promise.mapSeries 添加延迟

目前我的代码循环遍历定义中的每个端点,然后将每个承诺的响应添加到一个承诺数组:

promises.push(getMethodResponse(resourceMethod, value, apiName, resourcePath)); 

循环结束后,我运行这个:

        return Promise.all(promises)
        .catch((err) => {
            winston.error(err);
        })

我在 mapSeries 上尝试过同样的方法(不走运)。

看起来 (getMethodResponse promise) 中的函数会立即运行,因此,无论我添加什么类型的延迟,它们都仍然只是执行。我怀疑我需要让 (getMethodResponse) 返回一个函数,然后使用 mapSeries 但我也无法让它工作。

我试过的代码: 将getMethodResponse 包裹在此:

return function(value){}

然后在循环之后添加这个(和循环内 - 没有区别):

 Promise.mapSeries(function (promises) {
 return 'a'();
 }).then(function (results) {
 console.log('result', results);
 });

还尝试了许多其他建议:

Here

Here

有什么建议吗?

编辑

根据要求,提供一些额外的代码来尝试查明问题。

当前使用一小组端点的代码(在 Swagger 文件中):

module.exports = (apiName, externalUrl) => {

return getSwaggerFromHttp(externalUrl)
    .then((swagger) => {
        let paths = swagger.paths;
        let resourcePath = '';
        let resourceMethod = '';
        let promises = [];

        _.each(paths, function (value, key) {
            resourcePath = key;
            _.each(value, function (value, key) {
                resourceMethod = key;
                let statusList = [];
                _.each(value.responses, function (value, key) {
                    if (key >= 200 && key <= 204) {
                        statusList.push(key)
                    }
                });
                _.each(statusList, function (value, key) { //Only for 200-201 range  

                    //Working with small set 
                    promises.push(getMethodResponse(resourceMethod, value, apiName, resourcePath))
                });             
            });
        });

        //Working with small set
        return Promise.all(promises)
        .catch((err) => {
            winston.error(err);
        })
    })
    .catch((err) => {
        winston.error(err);
    });

};

我已经尝试添加这个来代替 return Promise.all():

            Promise.map(promises, function() {
            // Promise.map awaits for returned promises as well.
            console.log('X');
        },{concurrency: 5})
        .then(function() {
            return console.log("y");
        });

这样的结果会吐出这样的东西(每个端点都一样,有很多):

错误:TooManyRequestsException:请求过多 X 错误:TooManyRequestsException:请求太多 X 错误:TooManyRequestsException:请求太多

AWS SDK 在每个 Promise 中被调用 3 次,其函数是(从 getMethodResponse() 函数获取):

apigateway.getRestApisAsync()
return apigateway.getResourcesAsync(resourceParams)
apigateway.getMethodAsync(params, function (err, data) {}

典型的 AWS 开发工具包文档指出,这是进行太多连续调用(太快)时的典型行为。我过去遇到过类似的问题,只需在被调用的代码中添加一个 .delay(500) 即可解决;

类似:

    return apigateway.updateModelAsync(updateModelParams)
    .tap(() => logger.verbose(`Updated model ${updatedModel.name}`))
    .tap(() => bar.tick())
    .delay(500)

编辑#2

我想以彻底的名义,包括我的整个.js 文件。

'use strict';

const AWS = require('aws-sdk');
let apigateway, lambda;
const Promise = require('bluebird');
const R = require('ramda');
const logger = require('../logger');
const config = require('../config/default');
const helpers = require('../library/helpers');
const winston = require('winston');
const request = require('request');
const _ = require('lodash');
const region = 'ap-southeast-2';
const methodLib = require('../aws/methods');

const emitter = require('../library/emitter');
emitter.on('updateRegion', (region) => {
    region = region;
    AWS.config.update({ region: region });
    apigateway = new AWS.APIGateway({ apiVersion: '2015-07-09' });
    Promise.promisifyAll(apigateway);
});

function getSwaggerFromHttp(externalUrl) {
    return new Promise((resolve, reject) => {
        request.get({
            url: externalUrl,
            header: {
                "content-type": "application/json"
            }
        }, (err, res, body) => {
            if (err) {
                winston.error(err);
                reject(err);
            }

            let result = JSON.parse(body);
            resolve(result);
        })
    });
}

/*
    Deletes a method response
*/
function deleteMethodResponse(httpMethod, resourceId, restApiId, statusCode, resourcePath) {

    let methodResponseParams = {
        httpMethod: httpMethod,
        resourceId: resourceId,
        restApiId: restApiId,
        statusCode: statusCode
    };

    return apigateway.deleteMethodResponseAsync(methodResponseParams)
        .delay(1200)
        .tap(() => logger.verbose(`Method response ${statusCode} deleted for path: ${resourcePath}`))
        .error((e) => {
            return console.log(`Error deleting Method Response ${httpMethod} not found on resource path: ${resourcePath} (resourceId: ${resourceId})`); // an error occurred
            logger.error('Error: ' + e.stack)
        });
}

/*
    Deletes an integration response
*/
function deleteIntegrationResponse(httpMethod, resourceId, restApiId, statusCode, resourcePath) {

    let methodResponseParams = {
        httpMethod: httpMethod,
        resourceId: resourceId,
        restApiId: restApiId,
        statusCode: statusCode
    };

    return apigateway.deleteIntegrationResponseAsync(methodResponseParams)
        .delay(1200)
        .tap(() => logger.verbose(`Integration response ${statusCode} deleted for path ${resourcePath}`))
        .error((e) => {
            return console.log(`Error deleting Integration Response ${httpMethod} not found on resource path: ${resourcePath} (resourceId: ${resourceId})`); // an error occurred
            logger.error('Error: ' + e.stack)
        });
}

/*
    Get Resource
*/
function getMethodResponse(httpMethod, statusCode, apiName, resourcePath) {

    let params = {
        httpMethod: httpMethod.toUpperCase(),
        resourceId: '',
        restApiId: ''
    }

    return getResourceDetails(apiName, resourcePath)
        .error((e) => {
            logger.unimportant('Error: ' + e.stack)
        }) 
        .then((result) => {
            //Only run the comparrison of models if the resourceId (from the url passed in) is found within the AWS Gateway
            if (result) {
                params.resourceId = result.resourceId
                params.restApiId = result.apiId

                var awsMethodResponses = [];
                try {
                    apigateway.getMethodAsync(params, function (err, data) {
                        if (err) {
                            if (err.statusCode == 404) {
                                return console.log(`Method ${params.httpMethod} not found on resource path: ${resourcePath} (resourceId: ${params.resourceId})`); // an error occurred
                            }
                            console.log(err, err.stack); // an error occurred
                        }
                        else {
                            if (data) {
                                _.each(data.methodResponses, function (value, key) {
                                    if (key >= 200 && key <= 204) {
                                        awsMethodResponses.push(key)
                                    }
                                });
                                awsMethodResponses = _.pull(awsMethodResponses, statusCode); //List of items not found within the Gateway - to be removed.
                                _.each(awsMethodResponses, function (value, key) {
                                    if (data.methodResponses[value].responseModels) {
                                        var existingModel = data.methodResponses[value].responseModels['application/json']; //Check if there is currently a model attached to the resource / method about to be deleted
                                        methodLib.updateResponseAssociation(params.httpMethod, params.resourceId, params.restApiId, statusCode, existingModel); //Associate this model to the same resource / method, under the new response status
                                    }
                                    deleteMethodResponse(params.httpMethod, params.resourceId, params.restApiId, value, resourcePath)
                                        .delay(1200)
                                        .done();
                                    deleteIntegrationResponse(params.httpMethod, params.resourceId, params.restApiId, value, resourcePath)
                                        .delay(1200)
                                        .done();
                                })
                            }
                        }
                    })
                        .catch(err => {
                            console.log(`Error: ${err}`);
                        });
                }
                catch (e) {
                    console.log(`getMethodAsync failed, Error: ${e}`);
                }
            }
        })
};

function getResourceDetails(apiName, resourcePath) {

    let resourceExpr = new RegExp(resourcePath + '$', 'i');

    let result = {
        apiId: '',
        resourceId: '',
        path: ''
    }

    return helpers.apiByName(apiName, AWS.config.region)
        .delay(1200)
        .then(apiId => {
            result.apiId = apiId;

            let resourceParams = {
                restApiId: apiId,
                limit: config.awsGetResourceLimit,
            };

            return apigateway.getResourcesAsync(resourceParams)

        })
        .then(R.prop('items'))
        .filter(R.pipe(R.prop('path'), R.test(resourceExpr)))
        .tap(helpers.handleNotFound('resource'))
        .then(R.head)
        .then([R.prop('path'), R.prop('id')])
        .then(returnedObj => {
            if (returnedObj.id) {
                result.path = returnedObj.path;
                result.resourceId = returnedObj.id;
                logger.unimportant(`ApiId: ${result.apiId} | ResourceId: ${result.resourceId} | Path: ${result.path}`);
                return result;
            }
        })
        .catch(err => {
            console.log(`Error: ${err} on API: ${apiName} Resource: ${resourcePath}`);
        });
};

function delay(t) {
    return new Promise(function(resolve) { 
        setTimeout(resolve, t)
    });
 }

module.exports = (apiName, externalUrl) => {

    return getSwaggerFromHttp(externalUrl)
        .then((swagger) => {
            let paths = swagger.paths;
            let resourcePath = '';
            let resourceMethod = '';
            let promises = [];

            _.each(paths, function (value, key) {
                resourcePath = key;
                _.each(value, function (value, key) {
                    resourceMethod = key;
                    let statusList = [];
                    _.each(value.responses, function (value, key) {
                        if (key >= 200 && key <= 204) {
                            statusList.push(key)
                        }
                    });
                    _.each(statusList, function (value, key) { //Only for 200-201 range  

                        promises.push(getMethodResponse(resourceMethod, value, apiName, resourcePath))

                    });             
                });
            });

            //Working with small set
            return Promise.all(promises)
            .catch((err) => {
                winston.error(err);
            })
        })
        .catch((err) => {
            winston.error(err);
        });
};

【问题讨论】:

  • 在 Bluebird 中使用 Promise.map() 选项并使用 concurrency 选项来确定同时有多少请求在进行中,并确保在回调到 @987654341 之前不会启动请求@ 叫做。如果TooManyRequestsException 在一个时间段内请求太多,而不是同时请求太多,那么您将需要插入延迟。
  • 请参阅:How to insert a delay in a promise chain,了解如何插入延迟。
  • @jfriend00 感谢您的帮助,看来我越来越近了——我们有机会请conversation 完成它。我给你看会更容易吗?
  • 如果您显示您最近尝试过的实际代码(通过将其添加到您的问题中)并描述剩余的问题是什么,我可以帮助您解决它。除了当前代码的状态之外,主要问题是究竟是什么触发了 TooManyRequestsException 错误。同时请求?请求/秒?别的东西。在不知道实际限制是多少的情况下,您只是在猜测和测试哪个不是很好。 stackoverflow的方式是在这里解决问题,而不是离线对话。
  • @jfriend00 公平点,我已经用重要部分更新了问题,然后是整个 js 文件以供参考。这段代码还是很粗糙,我想在重构之前让它工作,谢谢!

标签: node.js aws-api-gateway aws-cli aws-sdk-js


【解决方案1】:

您显然对Promise.all()Promise.map() 的作用存在误解。

Promise.all() 所做的只是跟踪一整套 Promise,告诉您它们所代表的异步操作何时全部完成(或返回错误)。当您向它传递一系列承诺时(正如您所做的那样),所有这些异步操作都已经并行启动。因此,如果您试图限制同时运行的异步操作的数量,那么此时已经为时已晚。因此,Promise.all() 本身并不能帮助您以任何方式控制同时运行的数量。

我也注意到,这行promises.push(getMethodResponse(resourceMethod, value, apiName, resourcePath)) 似乎实际上是在执行承诺,而不是简单地将它们添加到数组中。似乎最后一个Promise.all() 实际上并没有做太多。

是的,当您执行promises.push(getMethodResponse()) 时,您会立即调用getMethodResponse()。这会立即启动异步操作。然后该函数返回一个promise,Promise.all() 将监视该promise(以及您放入数组中的所有其他promise)以告诉您它们何时完成。这就是Promise.all() 所做的一切。它监控您已经开始的操作。要将同时进行的最大请求数保持在某个阈值以下,您必须不要像您正在做的那样一次启动所有异步操作。 Promise.all() 不会为你这样做。


为了让 Bluebird 的 Promise.map() 为您提供帮助,您必须向它传递一个 DATA 数组,而不是 promise。当你向它传递一组代表你已经开始的异步操作的 Promise 时,它​​只能做Promise.all() 所能做的事情。但是,如果您向它传递一个数据数组和一个回调函数,该函数可以为数组中的每个数据元素启动异步操作,那么当您使用 concurrency 选项时,它可以为您提供帮助。

您的代码非常复杂,因此我将使用一个简单的网络爬虫来说明,它想要读取大量 URL 列表,但出于内存考虑,一次只能处理 20 个。

const rp = require('request-promise');
let urls = [...];    // large array of URLs to process

Promise.map(urls, function(url) {
    return rp(url).then(function(data) {
        // process scraped data here
        return someValue;
    });
}, {concurrency: 20}).then(function(results) {
   // process array of results here
}).catch(function(err) {
    // error here
});

在这个例子中,希望你能看到一个数据项数组被传递到Promise.map()(不是一个promise数组)。然后,Promise.map() 可以管理数组的处理方式/时间,在这种情况下,它将使用concurrency: 20 设置来确保同时进行的请求不超过 20 个。


您使用Promise.map() 的努力是传递了一系列承诺,这对您没有帮助,因为承诺代表已经开始的异步操作:

Promise.map(promises, function() {
    ...
});

然后,此外,您确实需要通过阅读有关目标 API 的文档或通过进行大量测试来找出导致 TooManyRequestsException 错误的确切原因,因为可能有各种各样的事情可能会导致这种情况,并且在不确切知道您需要控制什么的情况下,只需要进行很多疯狂的猜测来尝试找出可能有效的方法。 API 可能检测到的最常见的事情是:

  1. 来自同一帐户或来源的同时请求。
  2. 单位时间内来自同一帐户或来源的请求数(例如每秒请求数)。

Promise.map() 中的 concurrency 操作很容易帮助您使用第一个选项,但不一定会帮助您使用第二个选项,因为您可以限制同时请求的数量很少,但仍然超过每秒请求数限制.第二个需要一些实际的时间控制。插入delay() 语句有时会起作用,但即使这样也不是一种非常直接的管理方法,并且会导致控制不一致(有时有效,但有时无效)或次优控制(将自己限制在很远的地方低于你实际可以使用的)。

要管理每秒请求的限制,您需要使用速率限制库或您自己的代码中的实际速率限制逻辑进行一些实际的时间控制。

以下是限制每秒请求数的方案示例:How to Manage Requests to Stay Below Rate Limiting

【讨论】:

  • 感谢您的回复,我遇到的问题是如何从我的函数返回“数据”而不是承诺。我已经尝试将它们包装在 return function (value) {} 中,但这并没有真正帮助 - 我正在考虑的替代选项(尚未尝试)是从正在使用的方法名称中删除 Async?您的示例确实有意义,但正如您所提到的,这是一个非常简单的示例,它不使用方法,而是使用字符串 URL。
  • @Hexie - 我的示例向您展示了如何获取所有结果 - 就在它说“在此处处理结果数组”的位置。这些是异步操作,因此您不能直接从函数返回结果。您将不得不使用 .then() 来获得结果,就像我的示例所示。
  • 我指的是更改我当前使用的函数 (getMethodResponse) 以不返回它们所做的承诺,而是如您所说的“数据”。我正在尝试一些更改并假设我可以更改您的 rp(url) 以使用简单的 Resolve? Promise.map(promises, promise =&gt; {return Promise.resolve(promise).then(function (data)
  • @Hexie - 如果它们是异步操作,它们必须返回一个承诺。您不能从异步获取数据的事物中返回异步数据。我的rp(url) 代替了一些你已经拥有的异步操作,它返回一个承诺。你在那里插入你的异步函数,它返回一个承诺。可能你需要将你的问题分解成很多更小的部分,并询问你的一个问题。事实上,你有一个大而复杂的代码块,我们很难理解它的作用——我们很难帮助重写所有这些。
  • 所以事实证明,失败的原因是我的承诺链在方法中被破坏了。一旦它们都被正确链接,它就开始按预期工作。再次感谢您的帮助!
猜你喜欢
  • 2018-05-05
  • 2020-04-24
  • 2015-03-08
  • 2015-03-27
  • 2023-03-21
  • 1970-01-01
  • 1970-01-01
  • 2015-03-16
相关资源
最近更新 更多