在 Deno 中,就像在网络浏览器中一样,你应该能够use Web Workers to utilize 100% of a multi-core CPU。
在集群中,您需要一个“管理器”节点(根据需要/适当,它本身也可以是工作人员)。以类似的方式,Web Worker API 可用于根据需要创建任意数量的专职工作人员。这意味着主线程永远不应该阻塞,因为它可以将所有可能阻塞的任务委托给它的工作线程。不会阻塞的任务(例如简单的数据库或其他 I/O 绑定调用)可以像往常一样直接在主线程上完成。
Deno 还支持 navigator.hardwareConcurrency,因此您可以查询可用硬件并相应地确定所需工作人员的数量。不过,您可能不需要定义任何限制。从与先前生成的专用 worker 相同的来源生成新的专用 worker 可能足够快,可以按需执行此操作。即便如此,重用专门的 worker 可能比为每个请求生成一个新的 worker 更有价值。
使用Transferable Objects,大型数据集可以在不复制数据的情况下提供给工人/从工人那里获得。这与 messaging 一起使得委托任务变得非常直接,同时避免了复制大型数据集的性能瓶颈。
根据您的用例,您还可以使用像 Comlink 这样的库,“它消除了思考 postMessage 的心理障碍,并隐藏了您正在与工人一起工作的事实。”
例如
主.ts
import { serve } from "https://deno.land/std@0.133.0/http/server.ts";
import ComlinkRequestHandler from "./ComlinkRequestHandler.ts";
serve(async function handler(request) {
const worker = new Worker(new URL("./worker.ts", import.meta.url).href, {
type: "module",
});
const handler = ComlinkRequestHandler.wrap(worker);
return await handler(request);
});
工人.ts
/// <reference no-default-lib="true"/>
/// <reference lib="deno.worker" />
import ComlinkRequestHandler from "./ComlinkRequestHandler.ts";
ComlinkRequestHandler.expose(async (request) => {
const body = await request.text();
return new Response(`Hello to ${request.url}
Received:
${body}
`);
});
ComlinkRequestHandler.ts
import * as Comlink from "https://cdn.skypack.dev/comlink@4.3.1?dts";
interface RequestMessage extends Omit<RequestInit, "body" | "signal"> {
url: string;
headers: Record<string, string>;
hasBody: boolean;
}
interface ResponseMessage extends ResponseInit {
headers: Record<string, string>;
hasBody: boolean;
}
export default class ComlinkRequestHandler {
#handler: (request: Request) => Promise<Response>;
#responseBodyReader: ReadableStreamDefaultReader<Uint8Array> | undefined;
static expose(handler: (request: Request) => Promise<Response>) {
Comlink.expose(new ComlinkRequestHandler(handler));
}
static wrap(worker: Worker) {
const { handleRequest, nextResponseBodyChunk } =
Comlink.wrap<ComlinkRequestHandler>(worker);
return async (request: Request): Promise<Response> => {
const requestBodyReader = request.body?.getReader();
const requestMessage: RequestMessage = {
url: request.url,
hasBody: requestBodyReader !== undefined,
cache: request.cache,
credentials: request.credentials,
headers: Object.fromEntries(request.headers.entries()),
integrity: request.integrity,
keepalive: request.keepalive,
method: request.method,
mode: request.mode,
redirect: request.redirect,
referrer: request.referrer,
referrerPolicy: request.referrerPolicy,
};
const nextRequestBodyChunk = Comlink.proxy(async () => {
if (requestBodyReader === undefined) return undefined;
const { value } = await requestBodyReader.read();
return value;
});
const { hasBody: responseHasBody, ...responseInit } = await handleRequest(
requestMessage,
nextRequestBodyChunk
);
const responseBodyInit: BodyInit | null = responseHasBody
? new ReadableStream({
start(controller) {
async function push() {
const value = await nextResponseBodyChunk();
if (value === undefined) {
controller.close();
return;
}
controller.enqueue(value);
push();
}
push();
},
})
: null;
return new Response(responseBodyInit, responseInit);
};
}
constructor(handler: (request: Request) => Promise<Response>) {
this.#handler = handler;
}
async handleRequest(
{ url, hasBody, ...init }: RequestMessage,
nextRequestBodyChunk: () => Promise<Uint8Array | undefined>
): Promise<ResponseMessage> {
const request = new Request(
url,
hasBody
? {
...init,
body: new ReadableStream({
start(controller) {
async function push() {
const value = await nextRequestBodyChunk();
if (value === undefined) {
controller.close();
return;
}
controller.enqueue(value);
push();
}
push();
},
}),
}
: init
);
const response = await this.#handler(request);
this.#responseBodyReader = response.body?.getReader();
return {
hasBody: this.#responseBodyReader !== undefined,
headers: Object.fromEntries(response.headers.entries()),
status: response.status,
statusText: response.statusText,
};
}
async nextResponseBodyChunk(): Promise<Uint8Array | undefined> {
if (this.#responseBodyReader === undefined) return undefined;
const { value } = await this.#responseBodyReader.read();
return value;
}
}
用法示例:
% deno run --allow-net --allow-read main.ts
% curl -X POST --data '{"answer":42}' http://localhost:8000/foo/bar
Hello to http://localhost:8000/foo/bar
Received:
{"answer":42}
可能有更好的方法来做到这一点(例如通过Comlink.transferHandlers并为Request、Response和/或ReadableStream注册传输处理程序)但想法是相同的,甚至可以处理大的请求或响应负载身体通过消息流传输。