【问题标题】:Async.eachSeries: Callback already calledAsync.eachSeries:已调用回调
【发布时间】:2018-02-09 03:07:19
【问题描述】:

这是一个有点长和凌乱的代码,但请耐心等待,因为我需要完成这个。

我正在尝试为每个用户更新一个 json 对象。我希望循环迭代等待异步过程结束以避免竞争条件。但是,这导致了回调地狱,现在我无法确定每个回调返回的正确位置。

我在 Nesting async.eachSeries 上提到了这个答案,并尝试根据它来构建我的代码。但它仍然不起作用。代码在 callback1() 处给出回调已经调用错误。

    async.eachOfSeries(res, function (value, camp, callback3) {
let _id = res[camp]._id;
let arr = res[camp].campaignID;
async.eachOfSeries(arr, function2, function (err) {
    callback3();
})


function function2(value1, i, callback2) {
    let users = arr[i].users;
    let id = arr[i].id;
    let loop = Math.ceil(users / 1000);
    let limit = 0,
        offset = 0;
    for (let j = 0; j < loop; j++) {
        if (users > 1000) {
            limit = 1000;
            users -= limit;
        } else {
            limit = users;
        }
        console.log(limit + " limit " + offset + " offset");
        var start = Date.now();
        while (Date.now() < start + 100) {}
        const request = mailjet
            .get("messagesentstatistics")
            .request({
                "CampaignID": id,
                "AllMessages": true,
                "Limit": limit,
                "Offset": offset
            })
        request
            .then((result) => {
                let data = result.body.Data;
                var loop = 0;
                async.eachOfSeries(data, function1, function (err) {
                    console.log("function");
                    callback2();
                })
                console.log("oooooo");
            })
            .catch((err) => {
                console.log(err);
            })
        offset += limit;
    }

    function function1(value2, val, callback1) {
        console.log(data +" data");
        let jsonObj = data[val];
        let email = jsonObj.ToEmail;
        jsonObj['retailer'] = res[camp].retailer;
        jsonObj['summary'] = 'f';
        let tempObj = {};
        tempObj[id] = jsonObj;
        let options = {
            new: true
        };
        let campId = id;
        User.addCampaignResponse(email, campId, tempObj, options, function (err, results) {
            if (err) {
                throw err;
            } else {
                console.log("aasd");
                Campaign.updateResponse(_id, function (err, results2) {
                    if (err)
                        throw err;
                    else {
                        console.log("asdasaadas");
                        callback1();
                    }
                }) // console.log(results);
            }
        })
    }

}

}, function (err) {
    callback(undefined, "doneeeeee");
})

还有比这更好的方法吗?我也可以在某个地方使用瀑布吗?我可以更改回调位置以避免错误吗?


编辑:简化代码

function function2(value1, i, callback2) {
    // ...
    const request = mailjet
                    .get("messagesentstatistics")
                    .request({
                       // ...
                    });
    request
       .then((result) => {
          // ...
          async.eachOfSeries(data, function1, function (err) {
            callback2();
          });
        })
        .catch((err) => {
          // ...
        });
    }

function function1(value2, val, callback1) {
  // ...
  User.addCampaignResponse(email, campId, tempObj, options, function (err, results) {
    if (err) {
      throw err;
    } else {
      Campaign.updateResponse(_id, function (err, results2) {
        if (err) throw err;
        else callback1();
      });
    }
  });
}

async.eachOfSeries(res, function (value, camp, callback3) {
    // ...

    async.eachOfSeries(arr, function2, function (err) {
      callback3();
    });

  },
  function (err) {
    callback(undefined, "doneeeeee");
  });

【问题讨论】:

  • var start = Date.now(); while (Date.now() &lt; start + 100) {} -- 好的,我只需要问一下。为什么?
  • 为了便于阅读,如果您使用命名函数而不是匿名函数,将会有很大帮助。
  • 如果要整理,先改代码,把一些部分放到函数中,这样会更容易阅读,便于理解和调试
  • @MisterKartoot 有很多比同步阻塞线程更好的节流解决方案......
  • @GrégoryNEUT 提出了一个很好的观点,即把一些功能拉到他们自己的部分。您的代码似乎做了很多循环并且非常复杂,所以我不确定它到底做了什么。然而,我已经重写了其中的一些,以展示如何将其重组为单独的函数:这​​里是 gist

标签: javascript node.js asynchronous callback async.js


【解决方案1】:

我会这样做的。

我们使用if (err) return callback(err); 来停止当前的异步函数并将错误发送到更高级别。

async.eachSeries(res, function (r, callback1) {

    let _id = r._id;
    let arr = r.campaignID;

    async.eachSeries(arr, function firstLevel (a, callback2) {

        let users = a.users;
        let id = a.id;
        let loop = Math.ceil(users / 1000);
        let limit = 0, offset = 0;

        // for loop is synchronous whereas mailjet is asynchronous -> usually bad idea to mix those two
        // instead try async.timesSeries()
        async.timesSeries(loop, function getSentMessages (n, callback3) {

            if (users > 1000) {
                limit = 1000;
                users -= limit;
            } else {
                limit = users;
            }
            console.log(n, limit, "limit", offset, "offset");

            var start = Date.now();
            while (Date.now() < start + 100) {} // this does nothing...

            // async.js doesn't flow well with Promises so request your resource with a callback function
            mailjet
                .get("messagesentstatistics")
                .request({ CampaignID: id, AllMessages: true, Limit: limit, Offset: offset })
                .request(function (err, result, body) {

                // stop everything if an error occurred; send the error back up
                if (err) return callback3(err);

                let data = result.body.Data;
                var loop = 0;

                async.eachSeries(data, secondLevel (jsonObj, callback4) {
                    let email = jsonObj.ToEmail;
                    jsonObj.retailer = r.retailer;
                    jsonObj.summary = 'f';
                    let tempObj = {};
                    tempObj[id] = jsonObj;
                    let options = { new: true };
                    let campId = id;
                    User.addCampaignResponse(email, campId, tempObj, options, function (err, results) {
                        // stop everything if an error occurred; send the error back up
                        if (err) return callback4(err);

                        console.log("added campaign response");

                        Campaign.updateResponse(_id, function (err, results2) {
                            // stop everything if an error occurred; send the error back up
                            if (err) return callback4(err);

                            console.log("updated campaign response");

                            callback4();
                        });
                    })
                }, callback3);

            }); // end of mailjet

            offset += limit;

        }, callback2); // end of async.timesSeries

    }, callback1); // end of async.eachOfSeries

}, function (err) {
    // if an error occurs anywhere, it should back here
    if (err) {
        console.log(err);
        return;
    }
    console.log("doneeeeee");
});

此外,使用有意义的变量和函数名称总是更好。

【讨论】:

  • 我可以看到您在构建代码和提供我认为可以帮助我的解决方案方面付出了巨大的努力。但是代码给出了最内部的异步语句“,”的错误,我不知道为什么。
  • @MisterKartoot 我的代码中有一个小错字。检查“已编辑...”链接以查看位置。我不确定这是否会解决它。我还建议您将 console.log 语句放在不同的断点处,看看发生了什么。
【解决方案2】:

这是它的简化版


function secondLevel(value2, val, callback) {
    // ...
    User.addCampaignResponse(email, campId, tempObj, options, function (err, results) {
       if (err) {
          throw err;
       }

       Campaign.updateResponse(_id, function (err, results2) {
          // throw the error to the highest level if there is one
          if (err) throw err;

          // Finish the eachOfSeries if all is ok
          callback();
       });
    });
 }

function firstLevel(value1, i, callback) {
  // ...
  mailjet
    .get("messagesentstatistics")
    .request({
      // ...
    })
    .then((result) => {
      // ...
      async.eachOfSeries(data, secondLevel, function (err) {
        // throw the error to the highest level if there is one
        if (err) throw err;

        // Finish the eachOfSeries if all is ok
        callback();
      });
    })
    .catch((err) => {
      // throw the error to the highest level
      throw err;
    });
}

 //
 // Start of our program
 //
 async.eachOfSeries(res, function (value, camp, callback) {
     // ...
     async.eachOfSeries(arr, firstLevel, function (err) {
       // throw the error to the highest level if there is one
       if (!err) throw err;

       // Finish the eachOfSeries if all is ok
       callback();
     });
 },
 function (err) {
     if (err) {
        // Here we re done with an error
        // ...
        return;
     }

     // We did it, no, error
     // ...
 });

【讨论】:

  • 我尝试使用这个重构,它说 arr 没有定义,这意味着第一级函数看不到 arr。
  • 所以将其作为数据传递
  • 当试图访问函数内部时,它说 arr 是未定义的。
  • 您确实在顶级范围内将其声明为let arr = res[camp].campaignID;。我敢打赌,在根据您的情况调整我的答案时,您犯了一个错误。再看一遍,试着理解我做了什么并改变你的代码。
猜你喜欢
  • 1970-01-01
  • 2014-03-10
  • 1970-01-01
  • 2016-08-27
  • 1970-01-01
  • 1970-01-01
  • 2015-08-13
  • 2017-10-13
  • 2016-01-26
相关资源
最近更新 更多