【问题标题】:Call an array of promises in parallel, but resolve them in order without waiting for other promises to resolve并行调用一组promise,但按顺序解决它们而不等待其他promise解决
【发布时间】:2019-02-10 20:05:54
【问题描述】:

我有一系列要并行调用的 promise,但要同步解决。

我编写了这段代码来完成所需的任务,但是,我需要创建自己的对象 QueryablePromise 来包装本机 Promise,我可以同步检查它的已解决状态。

有没有更好的方法来完成这个不需要特殊对象的任务?

请注意。我不想使用Promise.all,因为我不想在处理承诺的效果之前等待所有承诺解决。而且我不能在我的代码库中使用async 函数。

const PROMISE = Symbol('PROMISE')
const tap = fn => x => (fn(x), x)

class QueryablePromise {
  resolved = false
  rejected = false
  fulfilled = false
  constructor(fn) {
    this[PROMISE] = new Promise(fn)
      .then(tap(() => {
        this.fulfilled = true
        this.resolved = true
      }))
      .catch(x => {
        this.fulfilled = true
        this.rejected = true
        throw x
      })
  }
  then(fn) {
    this[PROMISE].then(fn)
    return this
  }
  catch(fn) {
    this[PROMISE].catch(fn)
    return this
  }
  static resolve(x) {
    return new QueryablePromise((res) => res(x))
  }
  static reject(x) {
    return new QueryablePromise((_, rej) => rej(x))
  }
}

/**
 * parallelPromiseSynchronousResolve
 *
 * Call array of promises in parallel but resolve them in order
 *
 * @param  {Array<QueryablePromise>}  promises
 * @praram {Array<fn>|fn}  array of resolver function or single resolve function
 */
function parallelPromiseSynchronousResolve(promises, resolver) {
  let lastResolvedIndex = 0
  const resolvePromises = (promise, i) => {
    promise.then(tap(x => {
      // loop through all the promises starting at the lastResolvedIndex
      for (; lastResolvedIndex < promises.length; lastResolvedIndex++) {
        // if promise at the current index isn't resolved break the loop
        if (!promises[lastResolvedIndex].resolved) {
          break
        }
        // resolve the promise with the correct resolve function
        promises[lastResolvedIndex].then(
          Array.isArray(resolver)
            ? resolver[lastResolvedIndex]
            : resolver
        )
      }
    }))
  }
  
  promises.forEach(resolvePromises)
}

const timedPromise = (delay, label) => 
  new QueryablePromise(res => 
    setTimeout(() => {
      console.log(label)
      res(label)
    }, delay)
  )

parallelPromiseSynchronousResolve([
  timedPromise(20, 'called first promise'),
  timedPromise(60, 'called second promise'),
  timedPromise(40, 'called third promise'),
], [
  x => console.log('resolved first promise'),
  x => console.log('resolved second promise'),
  x => console.log('resolved third promise'),
])
&lt;script src="https://codepen.io/synthet1c/pen/KyQQmL.js"&gt;&lt;/script&gt;

为任何帮助干杯。

【问题讨论】:

  • 考虑创建一个 Promise 数组,其中每个 Promise 都包含其效果并将该数组传递给 Promise.all
  • @cartant 谢谢你的评论,你有一个例子说明它是如何工作的吗?
  • “有没有更好的方法来完成这个不需要特殊对象的任务?”“更好”是什么意思 ?为什么您认为需要“特殊对象”
  • 我相信特殊对象是必需的,因为我需要与上面的代码同步检查承诺的状态,你不能用原生 Promise 做到这一点,因此我需要用 @987654329 包装它@ 这使我能够在回调中检查 promise.resolved。我的意思是不必包装承诺更好,我想知道是否有功能性 js 代码约定或特定对象可以完成任务,或者是否有一个经过测试的库可以用来执行相同的任务。
  • @guest271314 实际上我认为你是对的,我用QueryablePromise 使代码过于复杂我只是在内部使用了一个数组来保存结果,然后检查该数组的承诺状态而不是拥有承诺本身的状态。

标签: javascript asynchronous promise synchronous


【解决方案1】:

使用for await...of 循环,如果你已经拥有一组promise,你可以很好地做到这一点:

const delay = ms => new Promise(resolve => { setTimeout(resolve, ms); });
const range = (length, mapFn) => Array.from({ length }, (_, index) => mapFn(index));

(async () => {
  const promises = range(5, index => {
    const ms = Math.round(Math.random() * 5000);
    return delay(ms).then(() => ({ ms, index }));
  });

  const start = Date.now();

  for await (const { ms, index } of promises) {
    console.log(`index ${index} resolved at ${ms}, consumed at ${Date.now() - start}`);
  }
})();

由于您不能使用异步函数,您可以通过使用Array.prototype.reduce() 将promise 链接在一起来模仿for await...of 的效果,并为每个链同步调度回调:

const delay = ms => new Promise(resolve => { setTimeout(resolve, ms); });
const range = (length, mapFn) => Array.from({ length }, (_, index) => mapFn(index));

const asyncForEach = (array, cb) => array.reduce(
  (chain, promise, index) => chain.then(
    () => promise
  ).then(
    value => cb(value, index)
  ),
  Promise.resolve()
);

const promises = range(5, index => {
  const ms = Math.round(Math.random() * 5000);
  return delay(ms).then(() => ms);
});

const start = Date.now();

asyncForEach(promises, (ms, index) => {
  console.log(`index ${index} resolved at ${ms}, consumed at ${Date.now() - start}`);
});

错误处理

由于承诺被声明为并行实例化,我假设任何单个承诺上的错误不会传播到其他承诺,除非通过asyncForEach()(如上)构造的任何潜在脆弱链。

但我们也希望在 asyncForEach() 中将它们链接在一起时避免在 Promise 之间交叉传播错误。这是一种稳健地安排错误回调的方法,其中错误只能从原始承诺传播:

const delay = ms => new Promise(resolve => { setTimeout(resolve, ms); });
const maybe = p => p.then(v => Math.random() < 0.5 ? Promise.reject(v) : v);
const range = (length, mapFn) => Array.from({ length }, (_, index) => mapFn(index));

const asyncForEach = (array, fulfilled, rejected = () => {}) => array.reduce(
  (chain, promise, index) => {
    promise.catch(() => {}); // catch early rejection until handled below by chain
    return chain.then(
      () => promise,
      () => promise // catch rejected chain and settle with promise at index
    ).then(
      value => fulfilled(value, index),
      error => rejected(error, index)
    );
  },
  Promise.resolve()
);

const promises = range(5, index => {
  const ms = Math.round(Math.random() * 5000);
  return maybe(delay(ms).then(() => ms)); // promises can fulfill or reject
});

const start = Date.now();

const settled = state => (ms, index) => {
  console.log(`index ${index} ${state}ed at ${ms}, consumed at ${Date.now() - start}`);
};

asyncForEach(
  promises,
  settled('fulfill'),
  settled('reject') // indexed callback for rejected state
);

这里唯一需要注意的是,在传递给asyncForEach() 的回调中抛出的任何错误都将被链中的错误处理吞没,除了在数组最后一个索引的回调中抛出的错误。

【讨论】:

  • 嗨帕特里克,是的,我也注意到了这一点并将其添加到您的 asyncForEach 函数中,并且我添加了使用单个回调或单个回调处理每个承诺响应并将整个函数包装在 @ 987654332@ 这样我就可以在所有承诺解决后继续这个链。感谢您的帮助。
  • 有趣的是,我无法在 Typescript 中完成这项工作,它无法预测已解决('rejected')和带有 2 个参数的错误。
【解决方案2】:

我建议确实使用Promise.all - 但不是一次用于所有承诺,而是您希望在每个步骤中实现的所有承诺。您可以使用reduce 创建这个承诺的“树列表”:

function parallelPromisesSequentialReduce(promises, reducer, initial) {
  return promises.reduce((acc, promise, i) => {
    return Promise.all([acc, promise]).then(([prev, res]) => reducer(prev, res, i));
  }, Promise.resolve(initial));
}

const timedPromise = (delay, label) => new Promise(resolve =>
  setTimeout(() => {
    console.log('fulfilled ' + label + ' promise');
    resolve(label);
  }, delay)
);

parallelPromisesSequentialReduce([
  timedPromise(20, 'first'),
  timedPromise(60, 'second'),
  timedPromise(40, 'third'),
], (acc, res) => {
  console.log('combining ' + res + ' promise with previous result (' + acc + ')');
  acc.push(res);
  return acc;
}, []).then(res => {
  console.log('final result', res);
}, console.error);

【讨论】:

  • 有趣的方法!介意我编辑我的asyncForEach() 实现以使用reduce() 而没有map() 就像你在这里一样吗?
  • 我继续按照我的要求进行了更改。如果您觉得我的编辑与您的答案没有足够的不同,请随时恢复它。
  • @PatrickRoberts 我很高兴你喜欢它:-) 实际上我觉得你的更新还不够相似:如果promisechain 之前拒绝,chain.then(() =&gt; promise) 会导致未处理的拒绝警告满足 - 真的应该使用Promise.all
  • 查看我的答案的第二部分以进行正确的错误处理。无论chainpromise 解决的顺序如何,拒绝仍然由以下then() 调用的第二个参数处理。
  • @PatrickRoberts 好的,我不确定其意图是什么,但如果您想稍后处理异常(按数组顺序),那么您应该使用明确的 promise.catch(e =&gt; {/* ignore til later*/}); 抑制潜在警告.
猜你喜欢
  • 2018-11-19
  • 1970-01-01
  • 2019-10-09
  • 2020-06-23
  • 1970-01-01
  • 1970-01-01
  • 2016-04-13
  • 2017-06-21
  • 1970-01-01
相关资源
最近更新 更多