【问题标题】:request-promise causing too many sockets to be openrequest-promise 导致打开的套接字过多
【发布时间】:2017-04-01 11:32:31
【问题描述】:

我目前正在尝试编写一个函数,该函数接收用户输入并根据单词在训练数据中出现的次数返回用户评论可能属于的顶级 subreddit。我有一个数据库,其中包含不同 subreddit 中单词的频率,我正在为每个 subreddit 和用户输入中的每个单词发出 GET 请求。

这增加了许多获取请求,因为我的数据库中有超过 7000 个子版块。我目前正在发出请求承诺(浅)请求以获取所有 subreddits 的列表。然后,对于每个 subreddit,我会遍历用户输入中的每个单词,创建另一个 request-promise 对象并将其添加到 promise 数组中。

一旦添加了所有 request-promise 对象,我会等到它们都使用 Promise.all 解决,然后我尝试打印给定 subreddit 的词频数组,但我得到一个“错误:连接EMFILE 的消息。

根据关于堆栈溢出的另一篇文章,这意味着我打开了太多套接字,但我对这怎么会发生感到困惑。根据我的理解,它不会一次只打开最多 user_words.length 可能的连接,因为这些是在 Promise.all 等待解决时正在完成的请求?我看不出连接是如何关闭的。

提前感谢您的帮助!

function getBestSubreddit(messageText) {
  var user_words = parse_message(messageText);
  var top_subreddit = "";
  var top_score = Number.MIN_SAFE_INTEGER;
  rp(dbUrl + '/.json?shallow=true').then(function(res) {
    res = JSON.parse(res);
    for (var subreddit in res) {
      if (res.hasOwnProperty(subreddit)) {
        var score = 0.0;
        var promises = []
        for (var i = 0; i < user_words.length; i++) {
          promises.push(rp(dbUrl + '/' + subreddit + '/word_freqs/' + user_words[i] + '.json'));
        }
        Promise.all(promises).then(function(values) {
          console.log(values);
        }, function(err) {
          console.log(err);
        });
      }
    }
  }).catch(function(err) {
    console.log(err);
  })
  return top_subreddit;
}

【问题讨论】:

标签: node.js request promise request-promise


【解决方案1】:

根据我的理解,由于这些是在 Promise.all 等待解决时正在完成的请求,它不会一次打开最多 user_words.length 可能的连接吗?我看不出连接是如何关闭的。

不,这是不正确的。您有两个嵌套的for 循环,因此您可以同时打开多达user_words.length * how many subreddits there are。请记住,rp()Promise.all() 不会阻塞,因此您可以运行嵌套的 for 循环以在处理任何响应之前完成启动每个连接。

您似乎还期望以某种方式与代码行return top_subreddit 同步返回结果。你也不能那样做。您应该返回一个最终将解决为所需结果的承诺。

根据我的理解,由于这些是在 Promise.all 等待解决时正在完成的请求,它不会一次打开最多 user_words.length 可能的连接吗?我看不出连接是如何关闭的。

这不是Promise.all()的正确理解。 Promise.all() 没有阻塞。在您的代码继续退出之前,它不会“等待”所有承诺都得到解决。它的行为是异步的。您的代码继续执行您的for 循环的其他迭代,Promise.all() 在将来某个时候调用它的.then() 处理程序,当您通过它的所有承诺都完成时。 for 循环的其他迭代继续运行并堆积更多套接字。

我认为解决此问题的最简单方法是创建一个要处理的 URL 数组,然后使用其中一个已经具有内置函数的异步库,以允许您运行最多 N 个异步操作同时在飞行中。由于您的代码是基于承诺的,我会选择 Bluebird 的 Promise.map() 来处理 URL 列表。

var Promise = require('bluebird');

function getBestSubreddit(messageText) {
  var user_words = parse_message(messageText);
  var top_subreddit = "";
  var top_score = Number.MIN_SAFE_INTEGER;
  return rp(dbUrl + '/.json?shallow=true').then(function(res) {
    res = JSON.parse(res);
    // build a list of URLs to process
    var urls = [];
    for (var subreddit in res) {
      if (res.hasOwnProperty(subreddit)) {
        for (var i = 0; i < user_words.length; i++) {
          urls.push(dbUrl + '/' + subreddit + '/word_freqs/' + user_words[i] + '.json');
        }
      }
    }
    // 
    return Promise.map(urls, function(url) {
        return rp(url);
    }, {concurrency: 20}).then(function(allResults) {
        // do any final processing of allResults here and return that value
        // to become the resolved result of the returned promise
    });
  }
}

getBestSubreddit(someText).then(function(result) {
    // process result here
}).catch(function(err) {
    // handle error here
});

在本例中,我将并发请求数设置为 20。您可以试验将其更改为更高或更低的数字是否会提高您的吞吐量。理想的数字取决于许多因素,包括您的本地执行环境、您请求的数据量、您拥有的带宽以及您从其发出请求的目标主机以及它如何处理同时请求。如果您过快地发出太多请求,您可能还需要关注目标的速率限制。

其他一些相关答案:

How to make millions of parallel http requests from nodejs app?

In Node js. How many simultaneous requests can I send with the "request" package

Making a million requests


我仍然不清楚你的问题到底是什么结果,但这是一个收集所有可能数据的版本。你最终会得到一个{result: result, subreddit: subreddit, word: word} 形式的对象数组,其中result 是你的rp() 对给定子目录和给定单词的结果。然后,您可以根据需要整理该组结果:

var Promise = require('bluebird');

function getBestSubreddit(messageText) {
  var user_words = parse_message(messageText);
  var top_subreddit = "";
  var top_score = Number.MIN_SAFE_INTEGER;
  return rp(dbUrl + '/.json?shallow=true').then(function(res) {
    res = JSON.parse(res);
    // build a list of URLs to process
    var requestData = [];
    for (var subreddit in res) {
      if (res.hasOwnProperty(subreddit)) {
        for (var i = 0; i < user_words.length; i++) {
          requestData.push({url:dbUrl + '/' + subreddit + '/word_freqs/' + user_words[i] + '.json', subreddit: subreddit, word: user_words[i]});
        }
      }
    }
    // 
    return Promise.map(requestData, function(url) {
        return rp(requestData.url).then(function(result) {
            return {result: result, subreddit: requestData.subreddit, word: requestData.word};
        });
    }, {concurrency: 20}).then(function(allResults) {
        // now filter through all the data with appropriate subreddit
        // allResults is an array of objects of this form {result: result, subreddit: subreddit, word: word}
        // return whatever you want the final result to be after processing the allResults array
    });
  }
}

getBestSubreddit(someText).then(function(result) {
    // process result here
}).catch(function(err) {
    // handle error here
});

【讨论】:

  • 谢谢!我认为您提供的代码会返回每个 subreddit 中每个频率的列表,因为它将每个可能的 URL 添加到 urls 数组中,然后使用 Promise.map() 将并发限制为一次最多 20 个。有没有办法获得每个 subreddit 的词频列表?我想计算每个 subreddit 的分数,然后使用该分数来确定顶部 subreddit。为此,我会为每个单独的 subreddit 做一个单独的 Promise.map() 吗?
  • @cheese123211 - 你的代码没有显示你想做什么来处理结果,所以我不能包含那种类型的代码,甚至不知道你想对结果做什么。我向您展示了如何获得所有结果以及如何控制一次有多少请求正在进行中。您应该能够自行处理结果。
  • @cheese123211 - 是的,您可以为每个子 reddit 累积结果。您问题中的代码未显示您要对结果执行的操作,因此我没有任何可显示的相关代码。
  • @cheese123211 - 由于我无法从您的问题中确切了解您想要获得的结果,因此我添加了一个新版本,该版本返回包含 {word, subreddit, rpResult} 的对象数组对于每个请求,您应该能够处理该对象数组以计算您所追求的任何结果。
【解决方案2】:

问题源于两个嵌套循环和未节流的rp() 调用,导致许多同时请求。

节流通常通过以下方式实现:

  • 通过构建then() 链进行序列化,例如通过减少数组。
  • 实施“并发”限制,例如使用 Bluebird 的 Promise.map() 及其 concurrency 选项。

我想这个特定问题必须有多种方法,但本质上是:

  • 汇集所有请求并按并发限制(jFriend00 的回答),
  • 允许一个循环保持同步并通过序列化或并发限制另一个循环,
  • 在序列化中嵌套序列化,
  • 在并发中嵌套并发,
  • 采用序列化和并发的混合方法。

这是一种混合方法,其中:

  • 原始外部循环被序列化限制
  • 原来的内部循环被 Bluebird 的并发地图限制了。
function getSubreddits(messageText) {
    var user_words = parse_message(messageText);
    return rp(dbUrl + '/.json?shallow=true').then(function(res) {
        var subreddits = Object.keys(JSON.parse(res));
        return subreddits.reduce(function(p, subreddit) {
            return p.then(function() {
                return Promise.map(user_words, function(word) {
                    return rp(dbUrl + '/' + subreddit + '/word_freqs/' + word + '.json');
                }, {concurrency: 10}).then(function(freqs) {
                    // return an object that associates each subreddit with its results
                    return {
                        'subreddit': subreddit, // or maybe the object for which `subreddit` is the key?
                        'word_freqs': freqs
                    };
                });
            });
        }, Promise.resolve());
    });
}

缺点是你最终会得到一个深深嵌套的眼睛,它不适合扁平化。也就是说,大多数(如果不是全部)其他方法都是相似的。

无论您采用哪种方法,getBestSubreddit() 现在都将包含对getSubreddits() 的调用以及对结果的一些后处理。

function getBestSubreddit(messageText) {
    return getSubreddits(messageText).then(function(results) {
        // Here `results` is an array of `{'subreddit', 'word_freqs'}` objects.
        // Loop through and calculate a score for each subreddit,
        // then use that score to determine the top subreddit,
        // and return it.
    }).catch(function(error) {
        console.log(error);
    });
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-12-26
    • 1970-01-01
    • 2010-10-27
    • 1970-01-01
    相关资源
    最近更新 更多