【问题标题】:Run NodeJS event loop / wait for child process to finish运行 NodeJS 事件循环/等待子进程完成
【发布时间】:2013-04-28 23:44:47
【问题描述】:

我首先尝试了对问题的一般描述,然后更详细地描述了为什么通常的方法不起作用。如果您想阅读这些抽象的解释,请继续。最后我解释了更大的问题和具体的应用,所以如果你愿意阅读,请跳转到“实际应用”。

我正在使用 node.js 子进程来执行一些计算密集型工作。父进程完成了它的工作,但在执行的某个时刻,它到达了一个点,它必须在继续之前从子进程获得信息。因此,我正在寻找一种方法来等待子进程完成。

我当前的设置有点像这样:

importantDataCalculator = fork("./runtime");
importantDataCalculator.on("message", function (msg) {
    if (msg.type === "result") {
        importantData = msg.data;
    } else if (msg.type === "error") {
        importantData = null;
    } else {
        throw new Error("Unknown message from dataGenerator!");
    }
});

还有其他地方

function getImportantData() {
    while (importantData === undefined) {
        // wait for the importantDataGenerator to finish
    }

    if (importantData === null) {
        throw new Error("Data could not be generated.");
    } else {
        // we should have a proper data now
        return importantData;
    }
}

所以当父进程启动时,它会执行第一段代码,生成一个子进程来计算数据并继续做它自己的工作。当它需要子进程的结果来继续时,它会调用getImportantData()。所以想法是getImportantData() 阻塞,直到计算出数据。

但是,我使用的方式不起作用。我认为这是由于我使用 while 循环阻止了事件循环的执行。并且由于 Event-Loop 不执行,因此无法接收到来自子进程的消息,因此 while-loop 的条件不能改变,使其成为无限循环。

当然,我真的不想使用这种 while 循环。我宁愿告诉 node.js “执行事件循环的一次迭代,然后返回给我”。我会重复这样做,直到收到我需要的数据,然后通过从 getter 返回继续执行我离开的地方。

我意识到他会带来多次重新进入相​​同功能的危险,但是我想使用它的模块在事件循环中几乎没有做任何事情,除了等待来自子进程的这条消息并发送其他消息报告这是进步,所以这应该不是问题。

有没有办法在 Node.js 中只执行一次事件循环迭代?还是有另一种方法可以实现类似的目标?还是有完全不同的方法来实现我在这里尝试做的事情?

到目前为止,我能想到的唯一解决方案是以我引入另一个过程的方式更改计算。在这种情况下,将有一个计算重要数据的进程,一个计算不需要重要数据的数据位的进程以及这两个进程的父进程,它只是等待来自两个子进程的数据并组合当它们到达时。由于它本身不需要做任何计算密集型的工作,它可以只等待来自事件循环的事件(=消息)并对它们做出反应,根据需要转发组合的数据并存储尚不能组合的数据片段。 然而,这引入了另一个进程,甚至更多的进程间通信,这引入了更多的开销,我想避免这种情况。

编辑

我认为需要更多细节。

父进程(我们称其为进程 1)本身就是由另一个进程(进程 0)生成的进程,用于执行一些计算密集型工作。实际上,它只是执行一些我无法控制的代码,所以我不能让它异步工作。我能做(并且已经做过)是让定期执行的代码调用一个函数来报告它的进度并提供部分结果。然后通过 IPC 将此进度报告发送回原始流程。

但在极少数情况下,部分结果不正确,因此必须对其进行修改。为此,我需要一些可以独立于正常计算的数据。但是,此计算可能需要几秒钟;因此,我启动另一个进程(进程 2)来执行此计算并通过 IPC 消息将结果提供给进程 1。现在进程 1 和 2 正在愉快地计算那里的东西,希望进程 2 计算的校正数据在进程 1 需要它之前完成。但有时需要更正流程 1 的早期结果之一,在这种情况下,我必须等待流程 2 完成计算。阻塞进程1的事件循环理论上是没有问题的,因为主进程(进程0)不会受到它的影响。唯一的问题是,通过阻止进程 1 中代码的进一步执行,我也阻塞了事件循环,这阻止了它从进程 2 接收结果。

所以我需要在不阻塞事件循环的情况下暂停进程 1 中代码的进一步执行。我希望有一个像 process.runEventLoopIteration 这样的调用来执行事件循环的迭代然后返回。

然后我会像这样更改代码:

function getImportantData() {
    while (importantData === undefined) {
        process.runEventLoopIteration();
    }

    if (importantData === null) {
        throw new Error("Data could not be generated.");
    } else {
        // we should have a proper data now
        return importantData;
    }
}

因此执行事件循环,直到我收到必要的数据,但不继续执行调用 getImportantData() 的代码。

基本上我在流程 1 中所做的是:

function callback(partialDataMessage) {
    if (partialDataMessage.needsCorrection) {
        getImportantData();
        // use data to correct message
        process.send(correctedMessage); // send corrected result to main process
    } else {
        process.send(partialDataMessage); // send unmodified result to main process
    }
}

function executeCode(code) {
    run(code, callback); // the callback will be called from time to time when the code produces new data
    // this call is synchronous, run is blocking until the calculation is finished
    // so if we reach this point we are done
    // the only way to pause the execution of the code is to NOT return from the callback 
}

实际应用/实现/问题

以下应用程序需要此行为。如果您有更好的方法来实现这一点,请随时提出。

我想执行任意代码并收到有关它更改了哪些变量、调用了哪些函数、发生了哪些异常等的通知。我还需要这些事件在代码中的位置,以便能够在 UI 中显示收集到的信息在原始代码旁边。

为了实现这一点,我检测代码并将回调插入其中。然后我执行代码,将执行包装在一个 try-catch 块中。每当使用有关执行的一些数据(例如变量更改)调用回调时,我都会向主进程发送一条消息,告诉它有关更改的信息。这样,当代码运行时,用户就会收到有关代码执行的通知。这些回调生成的事件的位置信息会在检测期间添加到回调调用中,所以这不是问题。

当异常发生时,问题就出现了。我还想通知用户测试代码中的异常。因此,我将代码的执行包装在一个 try-catch 中,任何超出执行的异常都会被捕获并发送到用户界面。但是错误的位置不正确。 node.js 创建的 Error 对象有一个完整的调用堆栈,因此它知道它发生在哪里。但是这个位置如果相对于检测代码,所以我不能按原样使用这个位置信息,在原始代码旁边显示错误。我需要将检测代码中的这个位置转换为原始代码中的一个位置。为此,在检测代码之后,我计算了一个 source map 以将检测代码中的位置映射到原始代码中的位置。但是,此计算可能需要几秒钟。所以,我想,我会启动一个子进程来计算源映射,而检测代码的执行已经开始。然后,当出现异常时,我会检查源地图是否已经计算过,如果没有,则等待计算完成才能更正位置。

由于要执行和监视的代码可以是完全任意的,我不能轻易地将其重写为异步的。我只知道它调用了提供的回调,因为我检测了代码来这样做。我也不能只存储消息并返回继续执行代码,在下次调用期间检查源映射是否已完成,因为继续执行代码也会阻塞事件循环,阻止计算源在执行过程中从未收到过映射。或者如果接收到它,那么只有在要执行的代码完全完成之后,这可能会很晚或永远不会(如果要执行的代码包含无限循环)。但在收到 sourceMap 之前,我无法发送有关执行状态的进一步更新。结合起来,这意味着我只能在要执行的代码完成后(可能永远不会)发送更正的进度消息,这完全违背了程序的目的(使程序员能够观察代码的作用,而它执行)。

暂时将控制权交给事件循环可以解决这个问题。然而,这似乎是不可能的。我的另一个想法是引入第三个过程,它控制执行过程和 sourceMapGeneration 过程。它从执行过程接收进度消息,如果有任何消息需要更正,它会等待 sourceMapGeneration 过程。由于进程是独立的,所以控制进程可以存储接收到的消息并等待sourceMapGeneration进程,而执行进程继续执行,一旦收到源映射,它就会更正消息并将它们全部发送出去。

但是,这不仅需要另一个进程(开销),还意味着我必须在进程之间再次传输代码,因为代码可能有数千行本身可能需要一些时间,所以我想尽可能少地移动它。

我希望这能解释为什么我不能也没有使用通常的“异步回调”方法。

【问题讨论】:

  • Fibers 是一个选项吗?纤维让你做到这一点(睡眠、屈服、跑步和暂停),并且看起来是你想要做的事情的好方法。 (查看示例)
  • @BenjaminGruenbaum Fibers 听起来非常适合我的问题。我会尝试并报告。如果您将此添加为答案,我会接受。
  • 让我知道 Fibers 是否有效,如果有效,我可以将其表述为答案。
  • @BenjaminGruenbaum Fibers 就像一个魅力。他们做的正是我想要的。由于在进程报告中间执行事件循环,我遇到了其他一些问题,但我认为这是我的代码的另一个问题。
  • 如果纤程工作,您可能还需要检查wait.for,基于纤程:github.com/luciotato/waitfor

标签: node.js event-loop


【解决方案1】:

在澄清您寻求的行为后,添加第三个​​ (:)) 解决方案我建议使用Fibers

Fibers 让你在 nodejs 中做co-routines。协程是允许多个入口/出口点的函数。这意味着您将能够让出控制权并随心所欲地恢复它。

这里有一个来自官方文档的sleep 函数,它就是这样做的,休眠给定的时间并执行操作。

function sleep(ms) {
    var fiber = Fiber.current;
    setTimeout(function() {
        fiber.run();
    }, ms);
    Fiber.yield();
}

Fiber(function() {
    console.log('wait... ' + new Date);
    sleep(1000);
    console.log('ok... ' + new Date);
}).run();
console.log('back in main');

您可以将等待资源的代码放在一个函数中,使其屈服,然后在任务完成后再次运行。

例如,根据问题改编您的示例:

var pausedExecution, importantData;
function getImportantData() {
    while (importantData === undefined) {
        pausedExecution = Fiber.current;
        Fiber.yield();
        pausedExecution = undefined;
    }

    if (importantData === null) {
        throw new Error("Data could not be generated.");
    } else {
        // we should have proper data now
        return importantData;
    }
}

function callback(partialDataMessage) {
    if (partialDataMessage.needsCorrection) {
        var theData = getImportantData();
        // use data to correct message
        process.send(correctedMessage); // send corrected result to main process
    } else {
        process.send(partialDataMessage); // send unmodified result to main process
    }
}

function executeCode(code) {
    // setup child process to calculate the data
    importantDataCalculator = fork("./runtime");
    importantDataCalculator.on("message", function (msg) {
        if (msg.type === "result") {
            importantData = msg.data;
        } else if (msg.type === "error") {
            importantData = null;
        } else {
            throw new Error("Unknown message from dataGenerator!");
        }

        if (pausedExecution) {
            // execution is waiting for the data
            pausedExecution.run();
        }
    });


    // wrap the execution of the code in a Fiber, so it can be paused
    Fiber(function () {
        runCodeWithCallback(code, callback); // the callback will be called from time to time when the code produces new data
        // this callback is synchronous and blocking,
        // but it will yield control to the event loop if it has to wait for the child-process to finish
    }).run();
}

祝你好运!我总是说用 3 种方法解决一个问题比用同样的方法解决 3 个问题要好。我很高兴我们能够为您解决一些问题。诚然,这是一个非常有趣的问题。

【讨论】:

    【解决方案2】:

    异步编程的规则是,一旦输入了异步代码,就必须继续使用异步代码。虽然您可以继续通过setImmediate 或类似的方式一遍又一遍地调用该函数,但您仍然会遇到您尝试从异步进程中调用return 的问题。

    在不了解您的程序的情况下,我无法确切告诉您应该如何构建它,但总的来说,从涉及异步代码的进程中“返回”数据的方法是传入回调;也许这会让你走上正轨:

    function getImportantData(callback) {
        importantDataCalculator = fork("./runtime");
        importantDataCalculator.on("message", function (msg) {
            if (msg.type === "result") {
                callback(null, msg.data);
            } else if (msg.type === "error") {
                callback(new Error("Data could not be generated."));
            } else {
                callback(new Error("Unknown message from sourceMapGenerator!"));
            }
        });
    }
    

    然后你会像这样使用这个函数:

    getImportantData(function(error, data) {
        if (error) {
            // handle the error somehow
        } else {
            // `data` is the data from the forked process
        }
    });
    

    我在我的一个截屏视频Thinking Asynchronously 中更详细地讨论了这一点。

    【讨论】:

    • 如果可以的话,我会使用回调。我遇到的问题是父进程本身会进行一些广泛的计算,但会定期调用特定的函数来报告进度。在某些情况下,要正确报告进度,需要来自子进程的数据,因此我需要确保在从报告函数返回之前获取这些数据,从而让父进程继续执行。我将编辑我的问题以更详细地解释这一点。
    • 这种逻辑可以(并且应该)以异步方式编写;试图违背它将使 Node 中的编程变得非常困难和尴尬。 promisesflow control libraries with async versions of things like while 之类的构造可以提供帮助,但确实需要以非常不同的方式考虑逻辑流程。
    【解决方案3】:

    您遇到的是一个非常常见的场景,从 nodejs 开始的熟练程序员经常会遇到这种情况。

    你是对的。您不能按照您尝试的方式执行此操作(循环)。

    node.js 中的主进程是单线程的,您正在阻塞事件循环。

    解决这个问题的最简单方法是:

    function getImportantData() {
        if(importantData === undefined){ // not set yet
            setImmediate(getImportantData); // try again on the next event loop cycle
            return; //stop this attempt
        }
    
        if (importantData === null) {
            throw new Error("Data could not be generated.");
        } else {
            // we should have a proper data now
            return importantData;
        }
    }
    

    我们正在做的是,该函数正在重新尝试使用 setImmediate 在事件循环的下一次迭代中处理数据。

    这引入了一个新问题,你的函数返回一个值。由于它尚未准备好,因此您返回的值是未定义的。所以你去编码响应式。您需要告诉您的代码数据到达时要做什么。

    这通常在带有回调的节点中完成

    function getImportantData(err,whenDone) {
        if(importantData === undefined){ // not set yet
            setImmediate(getImportantData.bind(null,whenDone)); // try again on the next event loop cycle
            return; //stop this attempt
        }
    
        if (importantData === null) {
            err("Data could not be generated.");
        } else {
            // we should have a proper data now
            whenDone(importantData);
        }
    }
    

    这可以通过以下方式使用

    getImportantData(function(err){
        throw new Error(err); // error handling function callback
    }, function(data){ //this is whenDone in our case
        //perform actions on the important data
    })
    

    【讨论】:

    • 我建议不要使用setImmediate,而是使用回调传递或Promises。 setImmediate 感觉就像试图让异步操作执行得有点像同步操作。
    • @robertklep 我完全同意,好评。最佳做法是从“消息”的回调中调用 getImportantData 或使用事件。
    • @robertklep 实际上,我认为这就是我想要的。异步执行某些操作,但是当它没有及时准备好时,就好像它是一个同步调用一样等待它。
    • 据我了解,这与 Brandon Tilley 提出的方法大致相同。我将编辑问题以解释为什么它在我的情况下不起作用。
    • @JoachimKurz 你不能阻止这一点。 importantDataCalculator 上的事件在有机会之前不会触发,这意味着代码必须首先执行(并完成执行)。这是 nodejs 如何工作的基础。这也很有意义,因为这意味着您响应式地编写代码,您对事件做出反应。
    【解决方案4】:

    您的问题(已更新)非常有趣,它似乎与我在异步捕获异常时遇到的问题密切相关。 (还有 Brandon 和我就这件事和我进行了一次有趣的讨论!这是一个小世界)

    请参阅this question,了解如何异步捕获异常。关键概念是您可以使用(假设 nodejs 0.8+)nodejs domains 来限制异常的范围。

    这将允许您轻松获取异常的位置,因为您可以使用 atry/catch 包围异步块。我认为这应该解决这里更大的问题。

    您可以在链接的问题中找到相关代码。用法是这样的:

    atry(function() {
        setTimeout(function(){
            throw "something";
        },1000);
    }).catch(function(err){
        console.log("caught "+err);
    });
    

    由于您可以访问atry 的范围,因此您可以在那里获得堆栈跟踪,这样您就可以跳过更复杂的源映射使用。

    祝你好运!

    【讨论】:

    • 这实际上也很有趣,也与更普遍的问题有关,谢谢。但是,我想我们可能对“位置”有不同的理解。这种方法使我能够捕获异步异常并找到引发它们的函数(位置)。但是,我所说的位置是抛出异常的确切行和列,以便能够在该位置的 UI 中显示它。我认为这种方法对我没有帮助,对吗?尽管如此,我还是可以使用域。
    猜你喜欢
    • 1970-01-01
    • 2014-07-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-08-11
    • 1970-01-01
    • 2022-05-11
    相关资源
    最近更新 更多