【问题标题】:Synchronous Multiple Requests in foor loop with Node js使用Node js在for循环中同步多个请求
【发布时间】:2017-12-26 18:42:43
【问题描述】:

我是从 Javascript 开始的,我需要帮助来弄清楚如何使这段代码在循环通过 for 循环时同步。 基本上我正在做的是在 for 循环中发出多个 POST 请求,然后我使用库 X-Ray 报废数据,最后我将结果保存到 Mongo 数据库。 输出没问题,但它以无序的方式出现并突然挂起,我必须使用 ctrl+C 强制关闭。这是我的功能:

  function getdata() {
  const startYear = 1996;
  const currentYear = 1998; // new Date().getFullYear()

for (let i = startYear; i <= currentYear; i++) {
for (let j = 1; j <= 12; j++) {
  if (i === startYear) {
    j = 12;
  }

  // Form to be sent
  const form = {
    year: `${i}`,
    month: `${j}`,
    day: '01',
  };

  const formData = querystring.stringify(form);
  const contentLength = formData.length;

  // Make HTTP Request
  request({
    headers: {
      'Content-Length': contentLength,
      'Content-Type': 'application/x-www-form-urlencoded',
    },
    uri: 'https://www.ipma.pt/pt/geofisica/sismologia/',
    body: formData,
    method: 'POST',
  }, (err, res, html) => {

    if (!err && res.statusCode === 200) {

      // Scrapping data with X-Ray
      x(html, '#divID0 > table > tr', {
        date: '.block90w',
        lat: 'td:nth-child(2)',
        lon: 'td:nth-child(3)',
        prof: 'td:nth-child(4)',
        mag: 'td:nth-child(5)',
        local: 'td:nth-child(6)',
        degree: 'td:nth-child(7)',
      })((error, obj) => {

        const result = {
          date: obj.date,
          lat: obj.lat.replace(',', '.'),
          lon: obj.lon.replace(',', '.'),
          prof: obj.prof == '-' ? null : obj.prof.replace(',', '.'),
          mag: obj.mag.replace(',', '.'),
          local: obj.local,
          degree: obj.degree,
        };

        // console.log(result);

        upsertEarthquake(result); // save to DB

      });

    }


  });

  }
  }
  }

我想我必须使用承诺或回调,但我不明白如何做到这一点,我已经尝试使用异步等待但没有成功。如果需要提供任何其他信息,请告诉我,谢谢。

【问题讨论】:

    标签: javascript node.js web-scraping request x-ray


    【解决方案1】:

    您正在循环中调用请求。

    异步函数是在主线程逻辑结束后获取结果(A.K.A.,在回调函数中接收响应)的函数。

    这样,如果我们有这个:

    for (var i = 0; i < 12; i++) {
        request({
            data: i
        }, function(error, data) {
            // This is the request result, inside a callback function
        });
    }
    

    逻辑将在调用回调之前运行超过 12 个requests,因此回调将被堆叠并在所有主循环运行后调用。

    如果不输入所有 ES6 生成器的东西(我认为这会使它变得更复杂,并且在低级别学习正在发生的事情对你来说更好),你要做的就是调用 request ,等待他的回调函数被调用,然后调用下一个request。怎么做?有很多方法,但我通常是这样的:

    var i= 0;
    function callNext() {
        if (i>= 12) {
            requestEnded();
        } else {
            request({
                data: i++ // Increment the counter as we are not inside a for loop that increments it
            }, function(error, data) {
                // Do something with the data, and also check if an error was received and act accordingly, which is very much possible when talking about internet requests
                console.log(error, data);
                // Call the next request inside the callback, so we are sure that the next request is ran just after this request has ended
                callNext();
            })
        }
    }
    callNext();
    
    requestEnded() {
        console.log("Yay");
    }
    

    这里你看到了逻辑。您有一个名为 callNext 的函数,它将进行下一次调用,如果不再需要调用,则调用 requestEnded

    requestcallNext 中被调用时,它会等待回调被接收(这将在未来的某个时间异步发生),将处理接收到的数据,然后在你告诉他调用的回调中再次callNext

    【讨论】:

    • 我试图实现它,但输出仍然是无序的,尽管它不再挂起。你可以在这里看到我做了什么pastebin.com/J0PdG9rv
    • @MiguelFerreira 你明白了。使用此信息,您创建了一种连接异步回调的方法,这很酷,但我仍然看到一个问题。您在另一个异步请求中有一个异步请求。在request 回调中调用x,它也有一个异步回调。您希望在所有异步回调完成后完成下一个请求。我已经稍微调整了你的 pastebin:pastebin.com/keshVjXx 在你得到你想要的结果之后,检查我在哪里调用getdata。至此,您已完成并可以继续下一个请求。
    • 请注意,如果您遇到错误或导致您不调用 x 函数的原因,它将不会继续下一个请求。想想这是否是你真正想要的。如果发生这种情况,您可能想重复请求,或者从头开始。随你(由你决定。只需添加一个else 并添加您想要在“请求链”中断时运行的代码。
    • 感谢 jorge 付出的时间和精力,您的解决方案很简单,并且为您完美 +1!编辑:我不能投票,因为我仍然有
    • @MiguelFerreira 哈哈,没问题。很高兴它起作用了:-)
    【解决方案2】:

    您可以使用开始和结束年份创建一个数组,而不是循环,然后将其映射到您的请求的配置,然后将结果映射到 X 射线返回的内容(X 射线返回 promise like 所以不需要打回来)。然后使用返回 promise 的函数将抓取的结果放入 mongodb。

    如果有东西拒绝,则创建一个 Fail 类型的对象并使用该对象进行解析。

    使用 Promise.all 并行启动所有请求、x-ray 和 mongo,但使用 throttle 限制活动请求的数量。

    这是代码中的样子:

    //you can get library containing throttle here:
    //  https://github.com/amsterdamharu/lib/blob/master/src/index.js
    const lib = require('lib');
    const Fail = function(details){this.details=details;};
    const isFail = o=>(o&&o.constructor)===Fail;
    const max10 = lib.throttle(10);
    const range = lib.range;
    const createYearMonth = (startYear,endYear)=>
      range(startYear,endYear)
      .reduce(
        (acc,year)=>
          acc.concat(
            range(1,12).map(month=>({year,month}))
          )
        ,[]
      );
    const toRequestConfigs = yearMonths =>
      yearMonths.map(
        yearMonth=>{
          const formData = querystring.stringify(yearMonth);
          return {
            headers: {
              'Content-Length': formData.length,
              'Content-Type': 'application/x-www-form-urlencoded',
            },
            uri: 'https://www.ipma.pt/pt/geofisica/sismologia/',
            body: formData,
            method: 'POST',
          };
        }
      );
    const scrape = html =>
      x(
        html, 
        '#divID0 > table > tr', 
        {
          date: '.block90w',
          lat: 'td:nth-child(2)',
          lon: 'td:nth-child(3)',
          prof: 'td:nth-child(4)',
          mag: 'td:nth-child(5)',
          local: 'td:nth-child(6)',
          degree: 'td:nth-child(7)'
        }
      );
    const requestAsPromise = config =>
      new Promise(
        (resolve,reject)=>
          request(
            config,
            (err,res,html)=>
              (!err && res.statusCode === 200) 
                //x-ray returns a promise:
                // https://github.com/matthewmueller/x-ray#xraythencb
                ? resolve(html)
                : reject(err)
          )
      );
    const someMongoStuff = scrapeResult =>
      //do mongo stuff and return promise
      scrapeResult;
    const getData = (startYear,endYear) =>
      Promise.all(
        toRequestConfigs(
          createYearMonth(startYear,endYear)
        )
        .map(
          config=>
            //maximum 10 active requests
            max10(requestAsPromise)(config)
            .then(scrape)
            .then(someMongoStuff)
            .catch(//if something goes wrong create a Fail type object
              err => new Fail([err,config.body])
            )
        )
      )
    //how to use:
    getData(1980,1982)
    .then(//will always resolve unless toRequestConfigs or createYearMonth throws
      result=>{
        //items that were successfull
        const successes = result.filter(item=>!isFail(item));
        //items that failed
        const failed = result.filter(isFail);
      }
    )
    

    抓取时经常发生的情况是,目标站点不允许您在 y 周期内发出超过 x 次请求,并开始将您的 IP 列入黑名单并在您超过此限制时拒绝服务。

    假设您想限制每 5 秒 10 个请求,那么您可以将上面的代码更改为:

    const max10 = lib.throttlePeriod(10,5000);
    

    其余代码相同

    【讨论】:

    • 非常感谢您的时间和精力,但 jorge 回答得更快,解决方案也更简单。
    • @MiguelFerreira 没问题,如果您需要帮助来实施此解决方案,因为您了解限制并行与抓取的关系,或者您是否因为一个失败而丢失所有成功处理的项目,或者如果您了解两者之间的区别,请告诉我命令式和声明式编程。
    【解决方案3】:

    你有 sync for...loop 里面有 async methods 的问题。

    解决此问题的一种干净方法是使用

    ES2017 async/await 语法

    假设您想在upsertEarthquake(result) 之后停止每次迭代,您应该将代码更改为类似的内容。

    function async getdata() {
        const startYear = 1996;
        const currentYear = 1998; // new Date().getFullYear()
    
        for (let i = startYear; i <= currentYear; i++) {
            for (let j = 1; j <= 12; j++) {
                if (i === startYear)
                    j = 12; 
    
                // Form to be sent
                const form = {
                    year: `${i}`,
                    month: `${j}`,
                    day: '01',
                };
    
                const formData = querystring.stringify(form);
                const contentLength = formData.length;
                //Make HTTP Request
                await new Promise((next, reject)=> { 
                    request({
                        headers: {
                            'Content-Length': contentLength,
                            'Content-Type': 'application/x-www-form-urlencoded',
                        },
                        uri: 'https://www.ipma.pt/pt/geofisica/sismologia/',
                        body: formData,
                        method: 'POST',
                    }, (err, res, html) => {
                        if (err || res.statusCode !== 200)
                            return next() //If there is an error jump to the next
    
                        //Scrapping data with X-Ray
                        x(html, '#divID0 > table > tr', {
                            date: '.block90w',
                            lat: 'td:nth-child(2)',
                            lon: 'td:nth-child(3)',
                            prof: 'td:nth-child(4)',
                            mag: 'td:nth-child(5)',
                            local: 'td:nth-child(6)',
                            degree: 'td:nth-child(7)',
                        })((error, obj) => {
                            const result = {
                                date: obj.date,
                                lat: obj.lat.replace(',', '.'),
                                lon: obj.lon.replace(',', '.'),
                                prof: obj.prof == '-' ? null : obj.prof.replace(',', '.'),
                                mag: obj.mag.replace(',', '.'),
                                local: obj.local,
                                degree: obj.degree,
                            }
                            //console.log(result);
                            upsertEarthquake(result); // save to DB
                            next() //This makes jump to the next for... iteration
                        })
    
                    }) 
                }
            }
        }
    }
    

    我假设upsertEarthquake 是一个异步函数或者是触发后忘记的类型。

    如果有错误你可以使用next(),但是如果你想打破循环,使用reject()

    if (err || res.statusCode !== 200)
        return reject(err)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-11-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-12-05
      • 1970-01-01
      相关资源
      最近更新 更多