【问题标题】:Deno on multi-core machines多核机器上的 Deno
【发布时间】:2023-01-23 04:28:21
【问题描述】:

在 Node.js 中,有集群模块可以利用机器上所有可用的核心,这非常棒,尤其是与节点模块pm2一起使用时。但我对 Deno 的一些功能非常感兴趣,但我想知道如何在多核机器上最好地运行它。

我知道有些工作人员非常适合特定任务,但对于正常的 Web 请求,多核机器的性能似乎有些浪费?在 Deno 中获得最大可用性和利用率的最佳策略是什么?

我有点担心,如果你只有一个进程在运行,并且有一些 CPU 密集型任务,无论出于何种原因,它都会“阻止”所有其他请求进入。在 node.js 中,集群模块会解决这个问题,因为另一个进程会处理请求,但我不确定如何在 Deno 中处理这个请求?

我认为你可以在 Deno 中的不同端口上运行多个实例,然后在它前面有某种负载均衡器,但相比之下,这似乎是一个相当复杂的设置。我还了解到您可以使用某种服务,例如 Deno Deploy 或其他任何服务,但我已经有了要运行它的硬件。

我有什么选择? 提前感谢您的明智建议和更好的智慧。

【问题讨论】:

  • Workerssubprocess API 是 Deno 中唯一的多线程抽象。听起来您想要的是 Worker API 之上的池抽象。目前还不存在这样的东西,但似乎已经编写了实现。您是否已经搜索过类似的东西?

标签: multithreading multiprocessing multicore deno


【解决方案1】:

在 Deno 中,就像在网络浏览器中一样,你应该能够use Web Workers to utilize 100% of a multi-core CPU

在集群中,您需要一个“管理器”节点(根据需要/适当,它本身也可以是工作人员)。以类似的方式,Web Worker API 可用于根据需要创建任意数量的专职工作人员。这意味着主线程永远不应该阻塞,因为它可以将所有可能阻塞的任务委托给它的工作线程。不会阻塞的任务(例如简单的数据库或其他 I/O 绑定调用)可以像往常一样直接在主线程上完成。

Deno 还支持 navigator.hardwareConcurrency,因此您可以查询可用硬件并相应地确定所需工作人员的数量。不过,您可能不需要定义任何限制。从与先前生成的专用 worker 相同的来源生成新的专用 worker 可能足够快,可以按需执行此操作。即便如此,重用专门的 worker 可能比为每个请求生成一个新的 worker 更有价值。

使用Transferable Objects,大型数据集可以在不复制数据的情况下提供给工人/从工人那里获得。这与 messaging 一起使得委托任务变得非常直接,同时避免了复制大型数据集的性能瓶颈。

根据您的用例,您还可以使用像 Comlink 这样的库,“它消除了思考 postMessage 的心理障碍,并隐藏了您正在与工人一起工作的事实。”

例如

主.ts

import { serve } from "https://deno.land/std@0.133.0/http/server.ts";

import ComlinkRequestHandler from "./ComlinkRequestHandler.ts";

serve(async function handler(request) {
  const worker = new Worker(new URL("./worker.ts", import.meta.url).href, {
    type: "module",
  });

  const handler = ComlinkRequestHandler.wrap(worker);

  return await handler(request);
});

工人.ts

/// <reference no-default-lib="true"/>
/// <reference lib="deno.worker" />

import ComlinkRequestHandler from "./ComlinkRequestHandler.ts";

ComlinkRequestHandler.expose(async (request) => {
  const body = await request.text();
  return new Response(`Hello to ${request.url}

Received:

${body}
`);
});

ComlinkRequestHandler.ts

import * as Comlink from "https://cdn.skypack.dev/comlink@4.3.1?dts";

interface RequestMessage extends Omit<RequestInit, "body" | "signal"> {
  url: string;
  headers: Record<string, string>;
  hasBody: boolean;
}

interface ResponseMessage extends ResponseInit {
  headers: Record<string, string>;
  hasBody: boolean;
}

export default class ComlinkRequestHandler {
  #handler: (request: Request) => Promise<Response>;
  #responseBodyReader: ReadableStreamDefaultReader<Uint8Array> | undefined;

  static expose(handler: (request: Request) => Promise<Response>) {
    Comlink.expose(new ComlinkRequestHandler(handler));
  }

  static wrap(worker: Worker) {
    const { handleRequest, nextResponseBodyChunk } =
      Comlink.wrap<ComlinkRequestHandler>(worker);

    return async (request: Request): Promise<Response> => {
      const requestBodyReader = request.body?.getReader();

      const requestMessage: RequestMessage = {
        url: request.url,
        hasBody: requestBodyReader !== undefined,
        cache: request.cache,
        credentials: request.credentials,
        headers: Object.fromEntries(request.headers.entries()),
        integrity: request.integrity,
        keepalive: request.keepalive,
        method: request.method,
        mode: request.mode,
        redirect: request.redirect,
        referrer: request.referrer,
        referrerPolicy: request.referrerPolicy,
      };

      const nextRequestBodyChunk = Comlink.proxy(async () => {
        if (requestBodyReader === undefined) return undefined;
        const { value } = await requestBodyReader.read();
        return value;
      });

      const { hasBody: responseHasBody, ...responseInit } = await handleRequest(
        requestMessage,
        nextRequestBodyChunk
      );

      const responseBodyInit: BodyInit | null = responseHasBody
        ? new ReadableStream({
            start(controller) {
              async function push() {
                const value = await nextResponseBodyChunk();
                if (value === undefined) {
                  controller.close();
                  return;
                }
                controller.enqueue(value);
                push();
              }

              push();
            },
          })
        : null;

      return new Response(responseBodyInit, responseInit);
    };
  }

  constructor(handler: (request: Request) => Promise<Response>) {
    this.#handler = handler;
  }

  async handleRequest(
    { url, hasBody, ...init }: RequestMessage,
    nextRequestBodyChunk: () => Promise<Uint8Array | undefined>
  ): Promise<ResponseMessage> {
    const request = new Request(
      url,
      hasBody
        ? {
            ...init,
            body: new ReadableStream({
              start(controller) {
                async function push() {
                  const value = await nextRequestBodyChunk();
                  if (value === undefined) {
                    controller.close();
                    return;
                  }
                  controller.enqueue(value);
                  push();
                }

                push();
              },
            }),
          }
        : init
    );
    const response = await this.#handler(request);
    this.#responseBodyReader = response.body?.getReader();
    return {
      hasBody: this.#responseBodyReader !== undefined,
      headers: Object.fromEntries(response.headers.entries()),
      status: response.status,
      statusText: response.statusText,
    };
  }

  async nextResponseBodyChunk(): Promise<Uint8Array | undefined> {
    if (this.#responseBodyReader === undefined) return undefined;
    const { value } = await this.#responseBodyReader.read();
    return value;
  }
}

用法示例:

% deno run --allow-net --allow-read main.ts
% curl -X POST --data '{"answer":42}' http://localhost:8000/foo/bar
Hello to http://localhost:8000/foo/bar

Received:

{"answer":42}

可能有更好的方法来做到这一点(例如通过Comlink.transferHandlers并为RequestResponse和/或ReadableStream注册传输处理程序)但想法是相同的,甚至可以处理大的请求或响应负载身体通过消息流传输。

【讨论】:

  • 我有兴趣了解多个工作人员在哪些特定用例中不能很好地工作。网络 API(例如)通常甚至不需要任何网络工作人员,因为数据库调用不会阻塞等。我唯一可以做到的场景想想网络工作者真正派上用场的地方是服务器在内存中计算某些东西并需要时间的地方。这样可以委托给一个工作者,然后主线程仍然完全可用于非阻塞请求,甚至其他阻塞请求委托给它的工作者池。
  • 我所说的是例如内存中的意外计算。也许我今天写了一个端点并没有考虑到这个端点可以增长。随着更多的用户和数据的增长,端点突然变慢,因为有更多的数据需要处理。这在我身上发生过 node 并且基本上使应用程序崩溃,直到我有时间修复它。我知道这几乎是可以解决的,但至少节点中的集群模块例如会在某种程度上防止这种情况。
  • 哦,我想我开始理解得更好了,你之前解释过,但我不明白。 ? 是的,我可以看到将每个电话,甚至是微不足道的电话,委托给员工可能是一个烦人的设置,但也许不是。每项工作都可以从同一来源产生,所以我认为这实际上只是将整个请求和响应转发给工作人员和从工作人员那里转发的问题。我没有使用过节点集群,但我想这基本上就是它在做什么。
  • 我更新了我的答案,添加了一些关于管理工作人员的额外想法以及关于名为 Comlink 的库的标注。我目前不知道有更好的方法来做你正在谈论的事情,我认为你应该能够以最小的努力将所有呼叫委托给同质工人,我认为这将有助于保持解决方案足够简单。
  • 谢谢 @mfulton26 我会检查 comlink,因为我以前成功使用过它。不过忘记了那个图书馆。如果没有其他合理的方法来做到这一点,我会奖励你的代表;)
【解决方案2】:

这完全取决于您希望推送给线程的工作负载。如果您对在主线程上运行的内置 Deno HTTP 服务器的性能感到满意,但您需要利用多线程更有效地创建响应,那么从 Deno v1.29.4 开始就很简单了。

HTTP 服务器会给你一个异步迭代器server,比如

import { serve } from "https://deno.land/std/http/server.ts";

const server = serve({ port: 8000 });

然后你可以使用内置功能pooledMap,比如

import { pooledMap } from "https://deno.land/std@0.173.0/async/pool.ts";

const ress = pooledMap( window.navigator.hardwareConcurrency - 1
                      , server
                      , req => new Promise(v => v(respond(req))
                      );

for await (const res of ress) {
  // respond with res
}

其中 respond 是一个处理接收到的请求并生成响应对象的函数。如果 respond 已经是一个异步函数,那么你甚至不需要将它包装成一个承诺。

但是,如果你想在单独的 therads 上运行多个 Deno HTTP 服务器,那也是可能的,但你需要一个像 GoBetween 这样的负载均衡器。在这种情况下,您应该在单独的线程中实例化多个 Deno HTTP 服务器,并在主线程中将它们的请求作为单独的异步迭代器接收。要实现这一点,每个线程你都可以这样做;

在工人方面,即./servers/server_800X.ts

import { serve } from "https://deno.land/std/http/server.ts";

const server = serve({ port: 800X });
console.log("Listening on http://localhost:800X/");

for await (const req of server) {
  postMessage({ type: "request", req });
}

在主线程中,您可以轻松地将相应的 worker http 服务器转换为异步迭代器,例如

async function* server_800X() {
  worker_800X.onmessage = event => {
    if (event.data.type === "request") {
      yield event.data.req;
    }
  };
}

for await (const req of server_800X()) {
  // Handle the request here in the main thread
}

您还应该能够通过使用 MuxAsyncIterators 功能将 HTTP (req) 或 res 异步迭代器多路复用到单个流中,然后由 pooledMap 生成。因此,如果您有 2 个 http 服务器在 server_8000.tsserver_8001.ts 上工作,那么您可以将它们多路复用到一个异步迭代器中,例如

const muxedServer = new MuxAsyncIterator<Request>();
muxedServer.add(server_8000);
muxedServer.add(server_8001);
for await (const req of muxedServer) {
  // repond accordingly(*)
}

显然,您还应该能够生成新线程来处理从 muxedServer 收到的请求,方法是利用 pooledMap,如上所示。

(*) 如果您选择使用负载均衡器和多个 Deno http 服务器,那么您应该为负载均衡器上的请求分配特殊的标头,指定它被转移到的服务器 ID。这样,通过检查这个特殊的标头,您可以决定从哪个服务器响应任何特定请求。

【讨论】:

    猜你喜欢
    • 2012-03-26
    • 2023-04-01
    • 2012-02-07
    • 2010-12-16
    • 2011-10-22
    • 2011-03-09
    • 2011-08-04
    • 2020-10-21
    • 1970-01-01
    相关资源
    最近更新 更多