【问题标题】:Async Concurrent Queue with max concurrency最大并发的异步并发队列
【发布时间】:2022-02-24 12:06:13
【问题描述】:

我遇到了一个自定义异步队列的错误,该队列一次调用 10 个异步函数。

我正在使用 50 个作业启动队列,一旦前 10 个作业完成,队列将移动到随后的 10 个作业,直到全部完成。

我遇到的错误是,一旦完成 50,它会重新启动前 5 个作业,一次有 2 个或 3 个或 1 个作业。队列末尾也需要不到 10 个作业。

请创建这两个文件并使用 mocha 进行测试,然后自己查看输出。

注意:将 mocha 中的超时设置为 0,以保持测试长时间运行。

Queue.js

function Queue(func, max) {
    this.jobs = [];
    this.func = func;
    this.max = max ? max : 10;
}

Queue.prototype.push = function(data) {
    var self = this;
    return new Promise(function(resolve, reject){
        self.jobs.push({data: data, resolve: resolve, reject: reject});
        if(!self.progress) {
            self.progress = true;
            self.run();
        }
    });
};

Queue.prototype.run = function() {
    var self = this;
    var tasks = [];

    console.log("--------------------");

    for(var i=0; i<this.jobs.length && i < this.max; i++) {
        tasks.push(this.jobs.shift());
        console.log("queuing", tasks[tasks.length-1].data);
    }
    console.log("Total jobs queued", tasks.length);

    Promise.all(
        tasks.map(function(task){
            return self.func(task.data)
                .then(task.resolve, task.reject);
        }
    )).then(this.next.bind(this));
};

Queue.prototype.next = function(){
    if(this.jobs.length) {
        this.run();
    } else {
        this.progress = false;
    }
};

module.exports = Queue;

QueueTest.js

function async(data) {
    return new Promise(function(resolve, reject){
        setTimeout(function(){
            console.log("resolving", data);
            resolve(data);
        }, Math.random() * 5000);
    });
}

it("should test queue", function(done){
    var queue = new Queue(async);
    Promise.all(
        [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
            30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50].map(queue.push.bind(queue))
    ).then(function(){
        done();
    });
});

【问题讨论】:

  • 您是否尝试过在作业完成后从队列中删除作业,然后继续检查队列是否为空?典型的队列。或者一旦作业完成,从原始队列中移除,并将其移动到 jobDoneQueue。
  • run 函数中的这一行tasks.push(this.jobs.shift()); 基本上就是这样做的,tasks 是run 函数中的局部变量,只存在于作用域内,并且已将作业排入队列。 shift 将它们从主队列中移除。
  • @NalinAgrawal,你能运行并测试这段代码吗?

标签: javascript node.js asynchronous promise


【解决方案1】:

问题在于Queue.prototype.run 中的for 循环。

我无法立即明白为什么它会出现这种行为,但解决方法是将 for 循环替换为 self.jobs.splice() 以创建 tasks 数组。

Queue.prototype.run = function() {
    console.log("--------------------");
    var self = this;
    var tasks = self.jobs.splice(0, self.max); // <<<<<<<< this is the fix
    console.log("tasks:", tasks.map(obj => obj.data));

    Promise.all(
        tasks.map(function(task){
            return self.func(task.data)
            .then(task.resolve, task.reject);
        }
    )).then(this.next.bind(this));
};

没有什么需要改变的。

【讨论】:

  • 哇,太好了。至少我们可以得出结论,问题是由于for 循环和splice 确实解决了问题。顺便说一句,thisself 都可以。我无法理解for 循环如何成为罪魁祸首。
  • 我也不知道。我很幸运,splice 是我尝试的第一件事。
  • 无论问题是什么,这都是一个简单的同步问题。我的修复与 promises 或 then()ing 没有任何关系。
  • 大声笑,我在想可能会发生某种关闭,并且在提供值时正在读取作业数组的脏长度值(值由 QueueTest 类中的 Promise.all 同时出现)这只是提供一些思考的猜测:)
  • 对于 Bluebird,它是带有并发选项的 Promise.map(),而不是 Promise.each() - 抱歉,我的错。所以使用Promise.map() 你可以写Promise.map([0, 1, 2, 3, ..., 50], async, {concurrency:10}).then(function() { console.log('... Done'); }); - 不需要Queue()。但是,它与您的Queue() 不同,后者允许在队列开始运行后推送更多数据元素。如果这是一个重要功能,并且您希望像 Bluebird 这样的并发性,那么调整 .run() 方法以在每个较早的 Promise 完成时保持一个运行中的 Promise 池。
【解决方案2】:

问题是for 循环中的这种情况:i&lt;this.jobs.length

i 正在计算批处理中计划的作业数。当i 是作业数组中的索引时,此条件是正确的。在这种情况下,我们只是想确认还有作业需要处理,所以我们可以简单地使用:this.jobs.length&gt;0

队列末尾的奇怪行为是因为随着元素从作业数组中移出,长度正在下降,但该批次中安排的作业数量 (i) 正在增加。以this.jobs.length进入for循环时为4为例:

i this.jobs.length i < this.jobs.length
0 4 true
1 3 true
2 2 false

在这种情况下,仅调度队列中四个作业中的两个后退出循环。检查是否还有剩余任务可以解决问题:

for(var i=0; this.jobs.length > 0 && i < this.max; i++) {

【讨论】:

    【解决方案3】:

    从数组并行调度任务,无需等待任何线程在允许的线程内完成

    
    const fastQueue = async <T, Q>(
      x: T[],
      threads: number,
      fn: (v: T, i: number, a: T[]) => Promise<Q>
    ) => {
      let k = 0;
      const result = Array(x.length) as Q[];
      await Promise.all(
        [...Array(threads)].map(async () => {
          while (k < x.length) result[k] = await fn(x[k], k++, x);
        })
      );
      return result;
    };
    
    const demo = async () => {
        const wait = (x: number) => new Promise(r => setTimeout(r, x, x))
        console.time('a')
        console.log(await fastQueue([1000, 2000, 3000, 2000, 2000], 4, (v) => wait(v)))
        console.timeEnd('a')
    }
    demo();
    
    
    

    【讨论】:

      猜你喜欢
      • 2011-08-01
      • 1970-01-01
      • 2017-06-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-12-05
      • 1970-01-01
      相关资源
      最近更新 更多