【问题标题】:AWS Lambda Node processing issueAWS Lambda 节点处理问题
【发布时间】:2021-01-10 15:35:24
【问题描述】:

使用 ASW Lambda (Node 12.x) 我正在尝试实现以下用例

  1. 调用数据库
  2. 解析数据库结果
  3. 依次调用 HTTPS Rest Endpoints
  4. 调用数据库进行插入
  5. 调用数据库进行查询
  6. 返回结果

完成这一切的代码是这样的

 var mysql = require('/opt/node_modules/mysql');
    var axios = require('axios');
    var qs = require('qs');
    
    var config = require('/opt/config.json');
    
    var con = mysql.createConnection({
        host: config.dbhost,
        user: config.dbuser,
        password: config.dbpassword,
        database: config.dbname
    });
    
    exports.handler = function(event, context, callback) {
        context.callbackWaitsForEmptyEventLoop = false;
        let errors = [];
        try {
            let userId = event.queryStringParameters.userId;
            let servicesType = event.queryStringParameters.servicesType;
            servicesType = servicesType.split(",");
            
            const sqlQuery = `SELECT * FROM USER_PRODUCT p where p.userId = ? and p.active= 1 
            and p.serviceType in (?)`;
            con.query(sqlQuery, [userId, servicesType], (error, data) => {
                if (error) {
                    throw error;
                }
                if (data != null && data.length > 0) {
                    data.forEach(getData);
                }
    
                const getSQLQuery = "select * from DATA p where p.userId = ? and p.serviceType 
                in (?) order by p.serviceType";
                con.query(getSQLQuery, [userId, servicesType], (error, data) => {
                    if (error) {
                        throw error;
                    }
                    callback(null, {
                        statusCode: 200,
                        headers: {
                            "Access-Control-Allow-Origin": "*"
                        },
                        body: JSON.stringify({ status: 'success', data: data }),
                    });
                });
    
            });
        }
        catch (error) {
            callback(null, {
                statusCode: 400,
                headers: {
                    "Access-Control-Allow-Origin": "*"
                },
                body: JSON.stringify({ status: 'error', message: "Invalid Request", error: errors }),
            })
        }
    };
    
    function getData(pObject) {
        switch (pObject.serviceType) {
            case 'SAMPLE1':
                sample1Processing(pObject);
                break;
            case 'SAMPLE2':
                sample2Processing(pObject);
                break;
            default:
                // code
        }
    }
    
    async function sample1Processing(pObject) {
        let token = await getAuthToken(pObject);
        console.log("Auth token  :" + token);
        if (typeof token !== 'undefined' && token) {
            let ppData = await getSample1Data(pObject, token);
            storeSample1Data(ppData, pObject);
        }
        else {
            console.log("Did not get auth token");
        }
    
    }
    
    async function getAuthToken(pObject) {
        let responseData = null;
        let input = Buffer.from(pObject.client + ":" + paymentObject.secret);
        let encode = input.toString('base64');
        var data = qs.stringify({
            'grant_type': 'client_credentials'
        });
        var config = {
            method: 'post',
            url: paymentObject.baseURL + '/v1/oauth2/token',
            headers: {
                'Authorization': 'Basic ' + encode,
                'Content-Type': 'application/x-www-form-urlencoded'
            },
            data: data
        };
    
        await axios(config)
            .then(function(response) {
                console.log(JSON.stringify(response.data));
                responseData = response.data.access_token;
            })
            .catch(function(error) {
                console.log(error);
            });
        console.log(`Returning ${responseData} from auth token call`);
        return responseData;
    }
    
    async function getSample1Data(pObject, token) {
        let responseData = null;
        var config = {
            method: 'get',
            url: paymentObject.baseURL + '/v1/reporting/transactions?start_date=2020-07-01T00:00:00Z&end_date=2020-07-30T23:59:59Z',
            headers: {
                'Authorization': 'Bearer ' + token
            }
        };
    
        await axios(config)
            .then(function(response) {
                console.log(JSON.stringify(response.data));
                responseData = response.data;
            })
            .catch(function(error) {
                console.log(error);
            });
        return responseData;
    }
    
    async function storeSample1Data(data, pObject) {
        let customObjectArray = [];
        let customObject = [];
        if (data.transaction_details.length > 0) {
            for (let i = 0; i < data.transaction_details.length; i++) {
                customObject = [];
                customObject.push(.....);
                customObjectArray.push(customObject);
            }
            var sql = "INSERT INTO DATA (colum names) VALUES ? on duplicate key update updatedDate = ?";
            await con.query(sql, [customObjectArray, new Date()], function(err) {
                if (err) throw err;
            });
            console.log('end');
        }
    }

  async function sample2Processing(pObject) {
        var data = qs.stringify({});
        var config = {
            method: 'get',
            url: paymentObject.baseURL + '/v1/payment_intents',
            headers: {
                'Authorization': 'Bearer ' + paymentObject.secret
            },
            data: data
        };
    
        let res = await axios(config);
        let data1 = res.data;
        storeSample2Data(data1, pObject);
        return data1;
    }

    
    async function storeSample2Data(data, pObject) {
        let customObjectArray = [];
        let customObject = [];
        if (data.data.length > 0) {
            for (let i = 0; i < data.data.length; i++) {
                customObject = [];
                customObject.push(.....)
                customObjectArray.push(customObject);
            }
            var sql = "INSERT INTO DATA (column names) VALUES ? on duplicate key update updatedDate = ?";
            await con.query(sql, [customObjectArray, new Date()], function(err) {
                if (err) throw err;
            });
            console.log('end');
        }
    }

问题是有时 API 调用不会发生,有时只有其中一些调用通过。有时一切都按要求完成,我得到了输出。

我了解 JS 的异步性质导致了这种情况,因此我所有的异步调用 r 都是在 promise 或通过 async/await 进行的。

【问题讨论】:

    标签: javascript node.js aws-lambda axios


    【解决方案1】:

    你应该等待sample1Processingsample2Processing的电话:

        async function getData(pObject) {
            switch (pObject.serviceType) {
                case 'SAMPLE1':
                    await sample1Processing(pObject);
                    break;
                case 'SAMPLE2':
                    await sample2Processing(pObject);
                    break;
                default:
                    // code
            }
        }
    

    以及分别在sample1Processingsample2Processing 中对storeSample1DatastoreSample2Data 的调用。

    需要更改调用 getData 调用的循环以处理异步调用,因此而不是

    con.query(sqlQuery, [userId, servicesType], (error, data) => {
       if (error) {
          throw error;
       }
       if (data != null && data.length > 0) {
          data.forEach(getData);
       }
       ...
    

    做类似的事情

    con.query(sqlQuery, [userId, servicesType], async (error, data) => {
       if (error) {
          throw error;
       }
       if (data != null && data.length > 0) {
          for (const d of data) {
            await getData(d);
          }
       }
       ...
    

    【讨论】:

    • 谢谢。看起来你解决了这个问题。如果一切顺利,几个小时后就会回答这个问题。再次感谢
    猜你喜欢
    • 2021-09-28
    • 1970-01-01
    • 2021-10-13
    • 2018-10-27
    • 2020-05-31
    • 1970-01-01
    • 2018-04-26
    • 1970-01-01
    • 2022-07-20
    相关资源
    最近更新 更多