【问题标题】:Execute promises concurrently with a buffer pool size in Javascript在 Javascript 中与缓冲池大小同时执行承诺
【发布时间】:2018-07-20 10:34:18
【问题描述】:

我有一个带有 promise 的函数,每次必须使用不同的参数执行 n 次。我想以一种脚本始终处理 3-4 个 Promise 的方式链接 Promise。

我是用 promise.all 实现的,这会同时执行 3 个,当所有的 promise 都解决后,它会继续执行下一个 3。

如何使它在 3 个中的一个解决时立即与另一个开始,但同时始终以最多 3 个工作?

for( var i = 0; i < tasks.length; i++){

    if( i > 0 && i%3 == 0 ){

      await Promise.all([
       doTaskFunction(tasks[i]),
        doTaskFunction(tasks[i-1]),
        doTaskFunction(tasks[i-2]),
      ]);
    }

  }

【问题讨论】:

标签: javascript node.js concurrency promise


【解决方案1】:

您可以使用es6-promise-pool 轻松实现这一目标:

const tasks = [
    (param) => new Promise(function(resolve, reject) {
        setTimeout(resolve, 2000, 'foo');
    }),
    () => new Promise(function(resolve, reject) {
        setTimeout(resolve, 2000, 'foo');
    }),
    () => new Promise(function(resolve, reject) {
        setTimeout(resolve, 2000, 'foo');
    }),
    () => Promise.resolve(1),
    () => Promise.resolve(2),
    () => Promise.resolve(3)
 ];
 
 let count = 1;

 const promiseProducer = () => {
    while(tasks.length) {
       console.log('processing ' + count++);
       const task = tasks.shift();
       return task(); // optionally you could pass a parameter here
    }
    
    return null;
 }
 
 const pool = new PromisePool(promiseProducer, 3); // concurrent Promises set to 3
 const poolPromise = pool.start();

 poolPromise.then(() => { console.log('done!'); })
&lt;script src="https://cdn.jsdelivr.net/npm/es6-promise-pool@2.5.0/es6-promise-pool.min.js"&gt;&lt;/script&gt;

【讨论】:

    【解决方案2】:

    我只是将我的天真与生成器实现留在这里! :)

    function* myIteratorFactory(arr) {
      for (let i = 0; i < arr.length; i++) {
        yield(arr[i])
      }
    }
    
    
    function delayPromise(text, ms) {
      return function() {
        return new Promise((resolve, reject) => {
          console.log('[%s] Promise with value %s just started', new Date().toISOString(), text)
          setTimeout(() => resolve(text), ms)
        }).then(() => console.log('[%s] Promise with value %s just ended', new Date().toISOString(), text))
      }
    }
    
    var promArr = [
      delayPromise('hi', 1500),
      delayPromise('alex', 2000),
      delayPromise('how', 1700),
      delayPromise('are', 1800),
      delayPromise('you', 1500),
    ]
    
    var que = 0
    var myIterator = myIteratorFactory(promArr)
    
    
    function executor(r) {
    
      while (que < 3) {
        var next = myIterator.next()
        if (next.done) return;
    
        next.value()
          .then(() => {
            que--
            executor(r)
            if (que == 0) r()
          })
        que++
      }
    
    
    
    }
    executor(() => console.log('i am done for today!'))

    【讨论】:

      【解决方案3】:

      可以用递归来解决。

      这个想法是,最初您发送最大允许数量的请求,并且这些请求中的每一个都应该在完成时递归地继续发送自己。

      function batchFetch(urls, concurrentRequestsLimit) {
          return new Promise(resolve => {
              var documents = [];
              var index = 0;
      
              function recursiveFetch() {
                  if (index === urls.length) {
                      return;
                  }
                  fetch(urls[index++]).then(r => {
                      documents.push(r.text());
                      if (documents.length === urls.length) {
                          resolve(documents);
                      } else {
                          recursiveFetch();
                      }
                  });
              }
      
              for (var i = 0; i < concurrentRequestsLimit; i++) {
                  recursiveFetch();
              }
          });
      }
      
      var sources = [
          'http://www.example_1.com/',
          'http://www.example_2.com/',
          'http://www.example_3.com/',
          ...
          'http://www.example_100.com/'
      ];
      batchFetch(sources, 5).then(documents => {
         console.log(documents);
      });
      

      【讨论】:

        【解决方案4】:

        如果您不想使用任何插件/依赖项,可以使用此解决方案。

        假设您的数据位于名为 datas 的数组中

        1. 创建一个函数来处理datas 数组中的数据,我们称之为processData()
        2. 创建一个函数,该函数将在 while 循环中一个接一个地执行 processData(),直到 datas 数组上没有剩余数据,让我们调用该函数 bufferedExecution()
        3. 创建大小为buffer_size 的数组
        4. bufferedExecution() 填充数组
        5. 并等待它在Promise.all()Promise.allSettled() 中解决

        这是一个工作示例,其中数据是数字,操作等待一段时间并返回数字的平方,它也随机拒绝。

        const datas = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13];
        // this datas array should not contain undefined values for this code to work
        
        const buffer_size = 3;
        const finishedPromises = [];
        
        // change this function to your actual function that processes data
        async function processData(item) {
          return new Promise((resolve, reject) => {
            // wait for some time
            setTimeout(() => {
              // randomly resolve or reject
              if (Math.random() > 0.5) {
                resolve(item ** 2);
              } else {
                reject("error message");
              }
            }, 1500);
          });
        }
        
        // this function executes one function per loop, but magic happens when you
        // execute the function below, multiple times
        async function bufferedExecution(callback, i) {
          return new Promise(async (resolve, reject) => {
            // take first vale to process
            let next = datas.shift();
            // check if there is a value, (undefined means you have reached the end of datas array)
            while (next != undefined) {
              // just to show which function is running (index of function in array)
              console.log(`running function id: ${i}`);
              let result;
              try {
                // process data with your function's callback
                result = await callback(next);
                // result finishes without error
                finishedPromises.push({
                  input: next,
                  result: result,
                });
              } catch (error) {
                // rejected, so adds error instead of result
                finishedPromises.push({
                  input: next,
                  error: error,
                });
              }
              // get next data from array and goes to next iteration
              next = datas.shift();
            }
            // once all that is done finish it
            resolve();
          });
        }
        
        // here is where the magic happens
        // we run the bufferedExecution function n times where n is buffer size
        // bufferedExecution runs concurrently because of Promise.all()/Promise.allsettled()
        const buffer = new Array(buffer_size)
          .fill(null)
          .map((_, i) => bufferedExecution(processData, i + 1));
        
        Promise.allSettled(buffer)
          .then(() => {
            console.log("all done");
            console.log(finishedPromises);
            // you will have your results in finishedPromises array at this point
            // you can use input KEY to get the actual processed value
            // first check for error, if not get the results
          })
          .catch((err) => {
            console.log(err);
          });

        输出

        // waits a while
        running function id: 1
        running function id: 2
        running function id: 3
        // waits a while
        running function id: 1
        running function id: 2
        running function id: 3
        // waits a while
        running function id: 1
        running function id: 2
        running function id: 3
        // waits a while
        running function id: 1
        running function id: 2
        running function id: 3
        // waits a while
        running function id: 1
        all done
        [
          { input: 1, error: 'error message' },
          { input: 2, result: 4 },
          { input: 3, result: 9 },
          { input: 4, result: 16 },
          { input: 5, error: 'error message' },
          { input: 6, result: 36 },
          { input: 7, result: 49 },
          { input: 8, error: 'error message' },
          { input: 9, result: 81 },
          { input: 10, result: 100 },
          { input: 11, result: 121 },
          { input: 12, error: 'error message' },
          { input: 13, result: 169 }
        ]
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2021-05-14
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2020-08-10
          • 2016-02-01
          • 1970-01-01
          • 2015-09-16
          相关资源
          最近更新 更多