【问题标题】:nodejs - Sending back a stream from a worker thread to the main threadnodejs - 将流从工作线程发送回主线程
【发布时间】:2020-06-16 18:21:27
【问题描述】:

我一直试图将在我的程序中完成的一些工作分开在不同的线程中。 其中一个函数需要向主线程返回一个流,但我遇到了以下异常:

Error
    at MessagePort.<anonymous> ([worker eval]:12:16)
    at processTicksAndRejections (internal/process/task_queues.js:97:5)
From previous event:
    at PoolWorker.work (node_modules/node-worker-threads-pool/src/pool-worker.js:22:12)
    at DynamicPool.runTask (node_modules/node-worker-threads-pool/src/pool.js:110:47)
    at DynamicPool.exec (node_modules/node-worker-threads-pool/src/dynamic-pool.js:51:17)
    at renderToPdf (src/modules/templates/render2.js:27:14)
    at Context.<anonymous> (test/modules/templates/render.test.js:185:68)

我尝试构建一个最小的示例来重现我想要实现的目标。基本上,我需要的是向主线程发回一个可读流。在这个例子中,我也有一个例外:

为了拥有一个工作线程池,我专门使用了库 node-worker-threads-pool DynamicPool。在里面我试图将html 转换为PDF。但我需要以某种方式将流返回到主线程。

const os = require('os');
const { DynamicPool } = require('node-worker-threads-pool');

const Pool = new DynamicPool(os.cpus().length);

async function convertToPDF(html) {
  return await Pool.exec({
    task: function() {
      const Promise = require('bluebird');
      const pdf = require('html-pdf');

      const { html } = this.workerData;

      const htmlToPdf = (html, renderOptions) => {
        const options = {
          format: 'Letter',
        };
        return pdf.create(html, Object.assign(options, renderOptions || {}));
      };

      return Promise.fromNode((cb) => htmlToPdf(html, {}).toStream(cb));
    },
    workerData: {
      html,
    },
  });
}

convertToPDF('<div>Hello World!</div>')
  .then((resp) => console.log('resp', resp))
  .catch((err) => console.error('err', err));
err DataCloneError: function() {
    if (this.autoClose) {
      this.destroy();
    }
  } could not be cloned.
    at MessagePort.<anonymous> ([worker eval]:12:16)
    at processTicksAndRejections (internal/process/task_queues.js:97:5)

您知道我该如何实现这一目标吗?

PS:我知道 IO 操作在工作线程中的性能不如在 nodejs 主线程中,但我需要这样做以避免这些操作锁定主线程。

【问题讨论】:

    标签: node.js multithreading


    【解决方案1】:

    短版:你不能。

    节点中的IPC是通过一些黑盒处理的,但是我们知道消息对象在发送之前被序列化,一旦接收到就反序列化:你不能序列化一个Stream 因为它基于无法序列化/反序列化的底层级别(套接字、文件描述符、自定义读写函数等)。

    所以你不得不交换可序列化的数据。

    看看html-pdf 我认为转换程序的一种简单方法是使用pdf.toBuffer: 而不是尝试将Stream 发送到主线程并在主线程中读取它以获得Buffer ,您应该将Buffer 发送到主线程,然后按原样使用它。

    希望这会有所帮助。

    【讨论】:

      【解决方案2】:

      与其尝试将Stream 提供给主线程,不如直接通过管道传递它?

      创建一个在他们之间共享的MessageChannel
      父级实现Readable,将其MessagePort包装起来监听.on('message')
      该线程有一个实现Writable 的接口,基本上将任何数据从write() 直接传递到.postMessage()

      不要忘记在write() 周围添加更多实现以获取其返回值。我总是在子线程的Writable 中返回false,并使用postMessage() 将所有'drain' 事件从主线程的Readable 转发回子线程(并在主线程的 时强制/捏造一个管道-Writable 从我们的 Readable 返回 true)

      现在您可以到主线程。
      使用您想从子线程发送的原始Stream,只需将.pipe() 发送到您的Writable
      在主线程中,只需从您的 Readable 中读取或通过管道传输,就好像您真的拥有它一样。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2011-03-25
        • 1970-01-01
        • 2021-01-31
        • 1970-01-01
        • 2018-04-29
        相关资源
        最近更新 更多