您可以使用Queue 数据结构作为基础,并在子类中添加特殊行为。 Queue 有两个方法的众所周知的接口enqueue()(添加新项目到结束)和dequeue()(删除第一个项目)。在您的情况下,dequeue() 等待异步任务。
特殊行为:
- 每次新任务(例如
fetch('url'))入队,this.dequeue() 都会被调用。
-
dequeue() 做了什么:
- 如果队列为空 ➜
return false(脱离递归)
- 如果队列忙 ➜
return false(上一个任务未完成)
- else ➜ 从队列中移除 first 任务并运行它
- 任务“完成”(成功或有错误)➜ 递归调用
dequeue() (2.),直到队列为空..
class Queue {
constructor() { this._items = []; }
enqueue(item) { this._items.push(item); }
dequeue() { return this._items.shift(); }
get size() { return this._items.length; }
}
class AutoQueue extends Queue {
constructor() {
super();
this._pendingPromise = false;
}
enqueue(action) {
return new Promise((resolve, reject) => {
super.enqueue({ action, resolve, reject });
this.dequeue();
});
}
async dequeue() {
if (this._pendingPromise) return false;
let item = super.dequeue();
if (!item) return false;
try {
this._pendingPromise = true;
let payload = await item.action(this);
this._pendingPromise = false;
item.resolve(payload);
} catch (e) {
this._pendingPromise = false;
item.reject(e);
} finally {
this.dequeue();
}
return true;
}
}
// Helper function for 'fake' tasks
// Returned Promise is wrapped! (tasks should not run right after initialization)
let _ = ({ ms, ...foo } = {}) => () => new Promise(resolve => setTimeout(resolve, ms, foo));
// ... create some fake tasks
let p1 = _({ ms: 50, url: '❪?❫', data: { w: 1 } });
let p2 = _({ ms: 20, url: '❪?❫', data: { x: 2 } });
let p3 = _({ ms: 70, url: '❪?❫', data: { y: 3 } });
let p4 = _({ ms: 30, url: '❪?❫', data: { z: 4 } });
const aQueue = new AutoQueue();
const start = performance.now();
aQueue.enqueue(p1).then(({ url, data }) => console.log('%s DONE %fms', url, performance.now() - start)); // = 50
aQueue.enqueue(p2).then(({ url, data }) => console.log('%s DONE %fms', url, performance.now() - start)); // 50 + 20 = 70
aQueue.enqueue(p3).then(({ url, data }) => console.log('%s DONE %fms', url, performance.now() - start)); // 70 + 70 = 140
aQueue.enqueue(p4).then(({ url, data }) => console.log('%s DONE %fms', url, performance.now() - start)); // 140 + 30 = 170
互动演示:
完整代码演示:https://codesandbox.io/s/async-queue-ghpqm?file=/src/index.js
您可以在控制台和/或开发工具的“性能”选项卡中玩耍并观察结果。这个答案的其余部分是基于它的。
说明:
enqueue() 返回一个新 Promise,它将在以后的某个时间解决(或拒绝)。这个Promise 可用于处理您的async 任务Fn 的响应。
enqueue() 实际上是 push() 和 Object 进入队列,持有任务 Fn 和返回的 Promise 的控制方法。
自从解包返回Promise insta。开始运行,this.dequeue() 每次我们排队一个新任务时都会被调用。
将一些performance.measure() 添加到我们的task,我们可以很好地可视化我们的队列:
(*.gif 动画)
-
第一行是我们的队列实例
- 新入队的
tasks 有一个“❚❚ 等待..”期间(第 3 行)(如果队列为空,则可能是 < 1ms)
- 在某些时候,它会出队并“▶ 运行..”一段时间(第 2 行)
日志输出(console.table()):
说明:
第一个task 是enqueue()d 在队列初始化后的2.58ms。
由于我们的队列是空的,所以没有❚❚ waiting (0.04ms➜ ~40μm)。
任务运行时13.88ms ➜ dequeue
Queue 类只是原生Array Fn´s 的包装器!
您当然可以在一个类中实现这一点。我只是想表明,你可以从已知的数据结构中构建你想要的东西。不使用Array 有一些很好的理由:
-
Queue 数据结构由两个公共方法的接口 定义。使用Array 可能会诱使其他人在其上使用本机Array 方法,例如.reverse(),.. 这会破坏definition。
-
enqueue() 和 dequeue() 比 push() 和 shift() 更具可读性
- 如果您已经有一个外部实现的
Queue 类,您可以从它扩展(可重用代码)
- 您可以将
class Queue 中的 Array 项替换为其他数据结构:“Doubly Linked List”可以降低 Array.shift() 的 代码复杂度,从 O(n) [线性] 到 O(1) [常数]。 (➜ 比原生数组 Fn 的时间复杂度更高!)(➜ 最终演示)
代码限制
这个AutoQueue 类不限于async 函数。它处理任何事情,可以像await item[MyTask](this) 一样调用:
-
let task = queue => {..} ➜ 同步函数
-
let task = async queue => {..} ➜ 异步函数
-
let task = queue => new Promise(resolve => setTimeout(resolve, 100) ➜ new Promise()
注意:我们已经用await 调用我们的任务,其中await 包装任务的响应到Promise。
Nr 2. (async 函数),总是自己返回一个Promise,而await 调用只是将一个Promise 包装到另一个Promise 中,效率稍低。
Nr 3. 很好。返回的 Promise 不会被await
包裹
这是异步函数的执行方式:(source)
- 异步函数的结果始终是 Promise
p。该 Promise 是在开始执行异步函数时创建的。
- 主体被执行。执行可以通过 return 或 throw 永久结束。或者它可以通过 await 暂时完成;在这种情况下,通常会在稍后继续执行。
- Promise
p 被返回。
以下代码演示了它是如何工作的:
async function asyncFunc() {
console.log('asyncFunc()'); // (A)
return 'abc';
}
asyncFunc().
then(x => console.log(`Resolved: ${x}`)); // (B)
console.log('main'); // (C)
// Output:
// asyncFunc()
// main
// Resolved: abc
您可以依赖以下顺序:
- 行 (A):异步函数同步启动。异步函数的 Promise 通过 return 解决。
- 第 (C) 行:继续执行。
- 第 (B) 行:Promise 解决通知异步发生。
阅读更多:“Callable values”
阅读更多:“Async functions”
性能限制
由于AutoQueue 仅限于处理一个任务在另一个之后,它可能会成为我们应用程序的瓶颈。限制因素是:
-
每次任务数: ➜
enqueue()d 新任务的频率。
-
每个任务的运行时间 ➜
dequeue() 中的阻塞时间,直到任务完成
1。每次任务
这是我们的责任!我们可以随时获取queue 的当前大小:size = queue.size。您的外部脚本需要一个“故障转移”案例来处理稳定增长的队列(查看“Stacked wait times”部分)。
您想避免像这样的“队列溢出”,其中平均/平均值 waitTime 会随着时间的推移而增加。
+-------+----------------+----------------+----------------+----------------+
| tasks | enqueueMin(ms) | enqueueMax(ms) | runtimeMin(ms) | runtimeMax(ms) |
| 20 | 0 | 200 | 10 | 30 |
+-------+----------------+----------------+----------------+----------------+
- ➜ 任务
20/20 等待 195ms 直到 exec 开始
- ➜ 从我们最后一个任务随机入队开始,又需要 + ~232ms,直到所有任务都得到解决。
2。每个任务的运行时间
这个比较难处理。 (等待fetch()无法改善,需要等到HTTP请求完成)。
也许您的fetch() 任务依赖于彼此的响应,并且运行时间过长会阻塞其他任务。
但是我们可以做一些事情:
-
也许我们可以缓存响应 ➜ 减少下一次排队的运行时间。
-
也许我们来自 CDN 的 fetch() 并且有一个我们可以使用的替代 URI。在这种情况下,我们可以从task 返回一个new Promise,它将在下一个task 是enqueue()d 之前运行。 (参见“错误处理”):
queue.enqueue(queue => Promise.race(fetch('url1'), fetch('url2')));
-
也许您有某种“long polling”或周期性 ajax task,每 x 秒运行一次,无法缓存。即使您不能减少运行时间本身,您也可以记录运行时间,这会给您一个近似值。估计下一次运行。也许可以将长时间运行的任务交换到其他队列实例。
平衡AutoQueue
什么是“高效”Queue? - 你的第一个想法可能是这样的:
最高效的Queue在最短的时间内处理最多的tasks?
既然我们无法改进task 运行时,我们可以缩短等待时间吗?该示例是一个queue,任务之间的等待时间零 (~0ms)。
提示:为了比较我们的下一个示例,我们需要一些不会改变的 base 统计数据:
+-------+----------------+----------------+------------------+------------------+
| count | random fake runtime for tasks | random enqueue() offset for tasks |
+-------+----------------+----------------+------------------+------------------+
| tasks | runtimeMin(ms) | runtimeMax(ms) | msEnqueueMin(ms) | msEnqueueMax(ms) |
| 200 | 10 | 30 | 0 | 4000 |
+-------+----------------+----------------+------------------+------------------+
Avg. task runtime: ⇒ (10ms + 30ms) / 2 = 20ms
Total time: ⇒ 20ms * 200 = 4000ms ≙ 4s
➜ We expect our queue to be resolved after ~4s
➜ For consistent enqueue() frequency we set msEnqueueMax to 4000
-
AutoQueue 在 ~4.12s 之后最后完成 dequeue()(^^ 参见工具提示)。
-
~120ms 比我们预期的 4s 长:
提示:每个任务 ~0.3ms 之后都有一个小的“日志”块”,我在其中构建/推送带有日志标记的 Object 到最后的 console.table() 日志的全局“数组”。这个解释了200 * 0.3ms = 60ms.. 丢失的60ms 未被跟踪(您会看到任务之间的小差距)-> 0.3ms/task 用于我们的测试循环,并且可能会因打开开发工具而延迟,..
我们稍后会回到这些时间。
我们queue的初始化代码:
const queue = new AutoQueue();
// .. get 200 random Int numbers for our task "fake" runtimes [10-30]ms
let runtimes = Array.from({ length: 200 }, () => rndInt(10, 30));
let i = 0;
let enqueue = queue => {
if (i >= 200) {
return queue; // break out condition
}
i++;
queue
.enqueue(
newTask({ // generate a "fake" task with of a rand. runtime
ms: runtimes[i - 1],
url: _(i)
})
)
.then(payload => {
enqueue(queue);
});
};
enqueue(queue); // start recurion
我们递归地enqueue()我们的下一个任务,就在前一个任务完成之后。您可能已经注意到 典型 Promise.then() 链的类比,对吧?
提示:如果我们已经知道tasks 的顺序和总数,我们不需要Queue 来按顺序运行。我们可以使用Promise 链并获得相同的结果。
有时我们在脚本开始时并不知道所有后续步骤..
..您可能需要更多灵活性,而我们要运行的下一个任务取决于上一个task的响应。 - 也许您的应用程序依赖于 REST API(多个端点),并且您被限制为最多 X 个同时 API 请求。我们不能使用来自您应用程序的所有请求向 API 发送垃圾邮件。你甚至不知道下一个请求何时收到enqueue()d(例如,API 请求是由click() 事件触发的?...
好的,对于下一个示例,我稍微更改了初始化代码:
我们现在在 [0-4000ms] 时间内随机将 200 个任务排入队列。 - 公平地说,我们将范围缩小了30ms(最大任务运行时间)到 [0-3970ms]。现在我们的随机填充队列有机会保持在4000ms limit内。
我们可以得到什么或 Dev-Tools 性能登录:
- 随机
enqueue() 导致大量“等待”任务。
有道理,因为我们在第一个 ~4000ms 中将所有任务排入队列,它们必须以某种方式重叠。检查表输出,我们可以验证:Max queue.size 是 22,此时任务 170/200被排队。
- 等待任务分布不均。刚开始后,甚至还有一些 idle 部分。
由于enqueue() 是随机的,我们的第一个任务不太可能获得0ms 偏移量。
~20ms 每个任务的运行时都会随着时间的推移导致堆叠效果。
- 我们可以按“等待毫秒”对任务进行排序(见屏幕):最长的等待时间是
>400ms。
queue.size(列:sizeOnAdd)和wait ms(见下一节)之间可能存在关系。
- 我们的
AwaitQueue 在其初始化后最后完成dequeue() ~4.37s(检查“性能”选项卡中的工具提示)。 20,786ms / task(预期:20ms)的平均运行时间为我们提供了4157.13ms(预期:4000ms≙4s)的总运行时间。
我们仍然有我们的“日志”块和 exec。我们的测试脚本的时间它自己~120ms。还是 ~37ms 更长的时间?一开始就总结所有闲置的“差距”解释了缺少的~37ms
回到我们最初的“定义”
最高效的Queue在最短的时间内处理最多的tasks?
假设:除了随机偏移,上例中tasks得到enqueue()d,两个队列处理相同的数字 的tasks(平均运行时间)在同一时间段内。入队的task 的等待 时间和queue.size 都不会影响总运行时间。两者效率一样吗?
由于Queue 本质上会缩小我们的编码可能性,因此如果我们谈论高效代码(每次任务数),最好不要使用Queue。
队列帮助我们将异步环境中的任务理顺成同步模式。这正是我们想要的。 ➜ “在一行中运行一个未知的任务序列”。
如果您发现自己在问这样的问题:“如果新的 task 被排入已经填充的队列,我们必须等待我们的结果,其他人的运行时间增加了。那效率较低!”。
那你就做错了:
- 您要么将彼此不依赖(以某种方式)相互依赖(逻辑或程序依赖)的任务排入队列,要么存在不会增加脚本总运行时间的依赖。 - 无论如何,我们必须等待其他人。
堆叠wait 次
我们已经看到一个任务在运行之前的wait 峰值时间461.05ms。如果我们能够在决定将其入队之前预测任务的 wait 时间,那不是很好吗?
首先,我们分析AutoQueue 类在较长时间内的行为。
(重新发布屏幕)
我们可以根据console.table() 的输出构建图表:
除了task的wait时间,我们可以看到随机的[10-30ms]runtime和3条曲线,代表当前的queue.size,记录在task的时间..
- .. 是
enqueued()
- .. 开始运行。 (
dequeue())
- .. 任务完成(就在下一个
dequeue() 之前)
再运行 2 次进行比较(趋势相似):
我们能找到彼此之间的依赖关系吗?
如果我们能找到这些记录的图表线之间的关系,它可能有助于我们了解 queue 在一段时间内的行为(➜ 不断地被新任务填满)。
Exkurs:什么是关系?
我们正在寻找将wait ms 曲线投影 到 3 个queue.size 记录之一的方程。 这将证明两者之间存在直接依赖关系。
在上次运行时,我们更改了启动参数:
-
任务数:200 ➜ 1000 (5x)
-
msEnqueueMax:4000ms ➜ 20000ms (5x)
+-------+----------------+----------------+------------------+------------------+
| count | random fake runtime for tasks | random enqueue() offset for tasks |
+-------+----------------+----------------+------------------+------------------+
| tasks | runtimeMin(ms) | runtimeMax(ms) | msEnqueueMin(ms) | msEnqueueMax(ms) |
| 1000 | 10 | 30 | 0 | 20000 |
+-------+----------------+----------------+------------------+------------------+
Avg. task runtime: ⇒ (10ms + 30ms) / 2 = 20ms (like before)
Total time: ⇒ 20ms * 1000 = 20000ms ≙ 20s
➜ We expect our queue to be resolved after ~20s
➜ For consistent enqueue() frequency we set msEnqueueMax to 20000
(互动图表:https://datawrapper.dwcdn.net/p4ZYx/2/)
我们看到了同样的趋势。 wait ms 随着时间的推移而增加(没有什么新东西)。由于底部的 3 条 queue.size 线被绘制到同一个图表中(Y 轴有 ms 刻度),它们几乎不可见。快速切换到对数刻度以便更好地进行比较:
(互动图表:https://datawrapper.dwcdn.net/lZngg/1/)
queue.size [on start] 和 queue.size [on end] 的两条虚线几乎相互重叠,一旦我们的队列为空,最后就会下降到“0”。
queue.size [on add] 看起来与wait ms 行非常相似。这就是我们所需要的。
{queue.size [on add]} * X = {wait ms}
⇔ X = {wait ms} / {queue.size [on add]}
仅此一项在运行时对我们没有帮助,因为 wait ms 对于新的排队任务(尚未运行)是未知的。所以我们还有 2 个未知变量:X 和 wait ms。我们需要其他关系来帮助我们。
首先,我们将新的日粮{wait ms} / {queue.size [on add]} 打印到图表中(浅绿色)及其平均值(浅绿色水平虚线)。这非常接近20ms(我们的任务平均为run ms),对吧?
切换回linear Y 轴并将其“最大比例”设置为80ms 以获得更好的视图。 (提示:wait ms 现在超出了视口)
(互动图表:https://datawrapper.dwcdn.net/Tknnr/4/)
回到我们任务的随机运行时间(点云)。我们仍然有我们的“总平均值”20.72ms(深绿色虚线水平)。我们还可以计算我们之前任务在运行时的平均值(例如,任务 370 入队 ➜ 任务 [1,..,269] 的当前平均运行时间 = 平均运行时间)。但我们甚至可以更精确:
我们排队的任务越多,它们对总“平均运行时间”的影响就越小。因此,让我们计算 last 的“平均运行时间”,例如50tasks。对于“平均运行时间”,这导致每个任务的一致影响为 1/50。 ➜ 峰值运行时间变直,趋势(上升/下降)被考虑在内。 (我们 1. 方程中浅绿色旁边的深绿色水平路径曲线)。
我们现在可以做的事情:
-
我们可以从第一个方程(浅绿色)中消除X。 ➜ X 可以表示为“先前n 的平均运行时间”,例如 50 个任务(深绿色)。
我们的新方程只依赖于在运行时已知的变量,就在入队点:
// mean runtime from prev. n tasks:
X = {[taskRun[-50], .. , taskRun[-2], taskRun[-1] ] / n } ms
// .. replace X in 1st equation:
⇒ {wait ms} = {queue.size [on add]} * {[runtime[-50], .. , runtime[-2], runtime[-1] ] / n } ms
-
我们可以在我们的图表上绘制一条新的图表曲线,并检查它与记录的wait ms(橙色)相比有多接近
(互动图表:https://datawrapper.dwcdn.net/LFp1d/2/)
结论
我们可以在任务入队之前预测它的wait,因为我们的任务的运行时间可以通过某种方式确定。因此,它在您将相同类型/功能的任务排入队列的情况下效果最佳:
用例:AutoQueue 实例填充了 UI 组件的渲染任务。渲染时间可能不会对聊天有太大影响(与 fetch() 相比)。也许您在地图上渲染 1000 个位置标记。每个标记都是一个带有render() Fn 的类的实例。
提示
-
Queues 用于各种任务。 ➜ 为不同类型的逻辑实现专用的Queue 类变体(不要在一个类中混合不同的逻辑)
- 检查所有可能排入同一
AutoQueue实例(现在或将来)的tasks,它们可能被所有其他阻止。
-
AutoQueue 不会提高运行时间,最多也不会降低。
- 为不同的
Task 类型使用不同的AutoQueue 实例。
- 监控
AutoQueue的大小,特别是..
- .. 大量使用(
enqueue() 频繁使用)
- .. 在 long 或 unknown
task 运行时
- 检查您的错误处理。由于您的
tasks 中的错误将只是reject 他们返回的入队承诺(promise = queue.enqueue(..))并且不会停止出队过程。你可以处理错误..
- .. 在你的任务中 ➜ `try{..} catch(e){ .. }
- .. 在它之后(在下一个之前)➜
return new Promise()
- ..“异步”➜
queue.enqueue(..).catch(e => {..})
- .. "global" ➜
AutoQueue 类中的错误处理程序
- 根据
Queue 的实现,您可能会看到queue.size。 Array 填充了 1000 个任务,不如我在最终代码中使用的“双向链表”等分散数据结构有效。
- 避免递归地狱。 (可以使用
tasks 和enqueue() 其他人)- 但是,调试AutoQueue 并不有趣,其中tasks 在async 环境中被其他人动态地enqueue()e..李>
- 乍一看
Queue 可能会解决一个问题(在某个抽象级别上)。但是,在大多数情况下,它会缩小现有的灵活性。它为我们的代码添加了一个额外的“控制层”(在大多数情况下,这正是我们想要的),同时,我们签署了一份合同以接受Queue 的严格规则。即使解决了问题,也未必是最好的解决方案。
添加更多功能[基本]
-
在enqueue() 上停止“自动dequeue()”:
由于我们的AutoQueue 类是通用的并且不限于长时间运行 HTTP requests(),您可以enqueue() 任何必须按顺序运行的函数,甚至3min 运行函数,例如“存储模块更新”,.. 你不能保证,当你在循环中enqueue() 100 个任务时,上一个添加的任务还不是 dequeued()。
您可能希望阻止enqueue() 调用dequeue(),直到添加所有位置。
enqueue(action, autoDequeue = true) { // new
return new Promise((resolve, reject) => {
super.enqueue({ action, resolve, reject });
if (autoDequeue) this.dequeue(); // new
});
}
.. 然后在某个时候手动调用queue.dequeue()。
-
控制方式: stop / pause / start
您可以添加更多控制方法。也许您的应用程序有多个模块都尝试在页面加载时fetch() 那里的资源。 AutoQueue() 的工作方式类似于 Controller。您可以监控有多少任务正在“等待..”并添加更多控件:
class AutoQueue extends Queue {
constructor() {
this._stop = false; // new
this._pause = false; // new
}
enqueue(action) { .. }
async dequeue() {
if (this._pendingPromise) return false;
if (this._pause ) return false; // new
if (this._stop) { // new
this._queue = [];
this._stop = false;
return false;
}
let item = super.dequeue();
..
}
stop() { // new
this._stop = true;
}
pause() { // new
this._pause = true;
}
start() { // new
this._stop = false;
this._pause = false;
return await this.dequeue();
}
}
-
转发响应:
您可能希望在 next 任务中处理 task 的“响应/值”。不能保证我们的 prev. 任务尚未完成,此时我们将 2nd 任务排入队列。
因此,最好存储上一个的响应。类内的任务并转发给下一个:this._payload = await item.action(this._payload)
错误处理
task Fn 中抛出的错误拒绝enqueue() 返回的承诺,并且不会停止出队过程。您可能希望在下一个 task 开始运行之前处理错误:
queue.enqueue(queue => myTask() ).catch({ .. }); // async error handling
queue.enqueue(queue =>
myTask()
.then(payload=> otherTask(payload)) // .. inner task
.catch(() => { .. }) // sync error handling
);
由于我们的Queue 是转储,而只有await's 用于解决我们的任务(item.action(this)),没有人阻止您返回强> 来自当前运行的task Fn 的new Promise()。 - 它将在下一个任务出队之前解决。
您可以throw new Error() 在任务 Fn 内部并在“外部”/运行后处理它们:queue.enqueue(..).catch()。
您可以在调用this.stop() 的dequeue() 方法中轻松添加自定义错误处理以清除“暂停”(排队)任务..
您甚至可以从任务函数内部操作队列。检查:await item.action(this) 调用 this 并授予对 Queue 实例的访问权限。 (这是可选的)。在某些用例中,task Fn´s 不应该这样做。
添加更多功能 [高级]
...达到文本限制:D
更多:https://gist.github.com/exodus4d/6f02ed518c5a5494808366291ff1e206
了解更多