【问题标题】:Concurrency limit in Q promises - nodeQ Promise 中的并发限制 - 节点
【发布时间】:2014-04-17 11:13:41
【问题描述】:

有没有什么方法可以使用 Q Promise 库来限制 Promise 的并发性?

这个问题和How can I limit Q promise concurrency?有点关系

但问题是我正在尝试做这样的事情:

for (var i = 0; i <= 1000; i++) {
  return Q.all([ task1(i), task2(i) ]); // <-- limit this to 2 at a time.
}

真正的用例是:

  1. 从数据库中获取帖子
  2. 循环数据库中的每个帖子,如posts.forEach(function(post) {}
  3. 为每个帖子执行 task1、task2、task3(检索社交计数器、检索 cmets 计数等)
  4. 在数据库中保存新的帖子数据。

但问题是节点同时为所有帖子执行所有任务,例如同时向 facebook 询问 500 个帖子的“点赞数”。

我如何限制Q.all() 以便一次只有 2 个帖子正在执行他们的任务?或者这里可以应用哪些其他可能的解决方案?

注意:大部分任务(如果不是全部)都依赖于request library

【问题讨论】:

标签: javascript node.js asynchronous concurrency q


【解决方案1】:

前几天我问了一个很相似的问题:Node.js/Express and parallel queues

我找到的解决方案(请参阅我自己的答案)是使用Caolan's async。它允许您创建“操作队列”,并且您可以限制可以同时运行的数量:参见“队列”方法。

在您的情况下,Node 的主循环将从 Q 中提取元素并在队列中为每个元素创建一个任务。您也可以限制这一点(因此基本上不会在 Q 之外重新创建队列),例如仅在执行最后一个元素时向队列添加 N 个新元素(“队列”方法的“空”回调)。

【讨论】:

    【解决方案2】:

    感谢 Dan,他的 answer 以及他帮助我将其与我的代码集成的帮助,可以使用他的 gist 和这样的片段来完成:

    var qlimit = require('../libs/qlimit');
    
    var test = function(id) {
      console.log('Running ' + id);
      return Q.nfcall(request, 'some dummy url which takes some time to process, for example a php file with sleep(5)').spread(function(response, body) {
        console.log('Response ' + id);
        return body;
      });
    }
    
    test = qlimit.limitConcurrency(test, 1);
    
    var data = [0, 1, 2];
    
    data.forEach(function(id) {
      console.log('Starting item ' + id);
      Q.all([ test(id) ]);
    });
    

    这样你会得到类似的东西:

    • 起始项目 0
    • 起始项目 1
    • 起始项目 2
    • 正在运行 0
    • 响应 0
    • 正在运行 1
    • 响应 1
    • 运行 2
    • 响应 2

    这显然是一次 1 个请求。

    我在实现中缺少的全部要点是您需要在开始循环之前使用limitConcurrency重新声明函数,而不是在循环内部。

    【讨论】:

      【解决方案3】:

      Here 是我用来限制 q 承诺的代码。

      我刚刚从一个我需要它的项目中取出它。如果有更多人感兴趣,我可以把它分成一个模块或其他东西。

      【讨论】:

        【解决方案4】:

        查看方法spex.pagespex.sequence。它们旨在为 data throttling + load balancing 实现任何可能的承诺策略。

        请参阅以下项目文档中的几个示例。

        平衡的页面来源

        下面的示例使用方法page 启动5 个页面的序列,然后将解析的数据记录到控制台。源函数为每个页面提供半秒的延迟。

        var $q = require('q');    
        var spex = require('spex')($q);
        
        function source(index, data, delay) {
            return new $q.Promise(function (resolve, reject) {
                setTimeout(function () {
                    resolve([
                        "page-" + index, // simple value;
                        $q.resolve(Date.now()) // promise value;
                    ])
                }, 500); // wait 1/2 second before serving the next page;
            });
        }
        
        function logger(index, data, delay) {
            console.log("LOG:", data);
        }
        
        spex.page(source, {dest: logger, limit: 5})
            .then(function (data) {
                console.log("FINISHED:", data);
            });
        

        输出:

        LOG: [ 'page-0', 1446050705823 ]
        LOG: [ 'page-1', 1446050706327 ]
        LOG: [ 'page-2', 1446050706834 ]
        LOG: [ 'page-3', 1446050707334 ]
        LOG: [ 'page-4', 1446050707839 ]
        FINISHED: { pages: 5, total: 10, duration: 2520 }
        

        平衡序列接收器

        在下面的示例中,我们有一个sequence,它在索引小于 5 时返回数据,目标函数在处理从源解析的每个数据时强制延迟 1 秒。

        var $q = require('q');    
        var spex = require('spex')($q);
        
        function source(index, data, delay) {
            console.log("SOURCE:", index, data, delay);
            if (index < 5) {
                return $q.resolve(index);
            }
        }
        
        function dest(index, data, delay) {
            console.log("DEST:", index, data, delay);
            return new $q.Promise(function (resolve, reject) {
                setTimeout(function () {
                    resolve();
                }, 1000);
            });
        }
        
        spex.sequence(source, dest)
            .then(function (data) {
                console.log("DATA:", data);
            });
        

        输出:

        SOURCE: 0 undefined undefined
        DEST: 0 0 undefined
        SOURCE: 1 0 1011
        DEST: 1 1 1001
        SOURCE: 2 1 1001
        DEST: 2 2 1001
        SOURCE: 3 2 1000
        DEST: 3 3 1000
        SOURCE: 4 3 1001
        DEST: 4 4 1001
        SOURCE: 5 4 1000
        DATA: { total: 5, duration: 5013 }
        

        【讨论】:

          猜你喜欢
          • 2013-12-31
          • 1970-01-01
          • 2015-01-15
          • 2015-11-04
          • 2016-03-15
          • 1970-01-01
          • 2015-11-24
          • 2017-12-12
          • 1970-01-01
          相关资源
          最近更新 更多