【问题标题】:Python multiprocessing Process crashes silentlyPython多处理进程静默崩溃
【发布时间】:2013-03-20 22:51:23
【问题描述】:

我使用的是 Python 2.7.3。我使用子类 multiprocessing.Process 对象并行化了一些代码。如果我的子类 Process 对象中的代码没有错误,则一切正常。但是,如果我的子类 Process 对象中的代码有错误,它们显然会默默地崩溃(没有堆栈跟踪打印到父 shell)并且 CPU 使用率将下降到零。父代码永远不会崩溃,给人的印象是执行只是挂起。同时,很难找出代码中的错误在哪里,因为没有给出错误在哪里的指示。

我在 stackoverflow 上找不到任何其他处理相同问题的问题。

我猜想子类 Process 对象似乎会默默地崩溃,因为它们无法将错误消息打印到父的 shell,但我想知道我能做些什么,以便我至少可以更有效地调试(并且这样我代码的其他用户也可以在遇到问题时告诉我)。

编辑:我的实际代码太复杂了,但是一个带有错误的子类 Process 对象的简单示例是这样的:

from multiprocessing import Process, Queue

class Worker(Process):

    def __init__(self, inputQueue, outputQueue):

        super(Worker, self).__init__()

        self.inputQueue = inputQueue
        self.outputQueue = outputQueue

    def run(self):

        for i in iter(self.inputQueue.get, 'STOP'):

            # (code that does stuff)

            1 / 0 # Dumb error

            # (more code that does stuff)

            self.outputQueue.put(result)

【问题讨论】:

  • 你能发布一个最小的测试用例来说明这个问题吗?
  • @Blender 是的。添加了一些代码。
  • 这是 Python 多处理的一个常见痛点。我建议使用 Ray github.com/ray-project/ray。像这样的异常可以很好地传播开箱即用。

标签: python python-2.7 parallel-processing multiprocessing


【解决方案1】:

您真正想要的是将异常传递给父进程的某种方式,对吗?然后你可以随心所欲地处理它们。

如果您使用concurrent.futures.ProcessPoolExecutor,这是自动的。如果你使用multiprocessing.Pool,这很简单。如果您使用显式的ProcessQueue,则需要做一些工作,但并没有太多。

例如:

def run(self):
    try:
        for i in iter(self.inputQueue.get, 'STOP'):
            # (code that does stuff)
            1 / 0 # Dumb error
            # (more code that does stuff)
            self.outputQueue.put(result)
    except Exception as e:
        self.outputQueue.put(e)

然后,您的调用代码可以像其他任何内容一样从队列中读取Exceptions。而不是这个:

yield outq.pop()

这样做:

result = outq.pop()
if isinstance(result, Exception):
    raise result
yield result

(我不知道您实际的父进程队列读取代码是做什么的,因为您的最小示例只是忽略了队列。但希望这可以解释这个想法,即使您的实际代码实际上并不是这样工作的。 )

这假设您要中止任何未处理的异常,使其达到run。如果您想传回异常并继续下一个i in iter,只需将try 移动到for 中,而不是围绕它。

这也假定Exceptions 不是有效值。如果这是一个问题,最简单的解决方案就是推送(result, exception) tuples:

def run(self):
    try:
        for i in iter(self.inputQueue.get, 'STOP'):
            # (code that does stuff)
            1 / 0 # Dumb error
            # (more code that does stuff)
            self.outputQueue.put((result, None))
    except Exception as e:
        self.outputQueue.put((None, e))

然后,您的弹出代码会这样做:

result, exception = outq.pop()
if exception:
    raise exception
yield result

您可能会注意到这类似于 node.js 回调样式,您将 (err, result) 传递给每个回调。是的,这很烦人,而且你会弄乱这种风格的代码。但是除了包装器之外,您实际上并没有在任何地方使用它;所有从队列中获取值或在run 内部调用的“应用程序级”代码只会看到正常的返回/收益和引发的异常。

您甚至可能想要考虑根据concurrent.futures 的规范构建Future(或按原样使用该类),即使您正在手动进行排队和执行工作。这并不难,而且它为您提供了一个非常好的 API,尤其是用于调试。

最后,值得注意的是,大多数围绕工作者和队列构建的代码都可以通过执行器/池设计变得更加简单,即使您绝对确定每个队列只需要一个工作者。只需废弃所有样板文件,并将Worker.run 方法中的循环转换为一个函数(这只是returns 或raises 正常,而不是附加到队列中)。在调用方,再次废弃所有样板文件,只删除 submitmap 作业函数及其参数。

你的整个例子可以简化为:

def job(i):
    # (code that does stuff)
    1 / 0 # Dumb error
    # (more code that does stuff)
    return result

with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
    results = executor.map(job, range(10))

它会自动正确处理异常。


正如您在 cmets 中提到的,异常的回溯不会回溯到子进程;它只涉及手动 raise result 调用(或者,如果您使用的是池或执行器,则为池或执行器的内脏)。

原因是multiprocessing.Queue 建立在pickle 之上,并且腌制异常不会腌制它们的回溯。原因是你不能腌制回溯。原因是回溯中充满了对本地执行上下文的引用,因此让它们在另一个进程中工作非常困难。

那么……你能做些什么呢?不要去寻找一个完全通用的解决方案。相反,想想你真正需要什么。 90% 的情况下,您想要的是“记录异常,使用回溯并继续”或“将异常打印到 stderrexit(1),就像默认的未处理异常处理程序一样”。对于其中任何一个,您根本不需要传递异常;只需在子端格式化它并传递一个字符串。如果您确实需要更花哨的东西,请准确计算出您需要什么,然后传递足够的信息来手动将它们组合在一起。如果您不知道如何格式化回溯和异常,请参阅traceback 模块。这很简单。这意味着您根本不需要进入泡菜机械。 (并不是说copyreg 一个pickler 或使用__reduce__ 方法或任何东西编写一个持有者类很难,但如果你不需要,为什么要学习所有这些?)

【讨论】:

  • 谢谢!这很棒。但是有没有办法打印整个堆栈跟踪?它告诉我现在有一个错误,它是什么,但不是在 Worker 类中发生错误的位置。
  • @npo:我会在答案中添加解释。
  • 如何将其应用于apply_async,它只使用了一个旨在将某些结果返回给回调的函数。我们是否只是将异步函数的内部封装在 try/except 中,然后将异常对象返回给回调?
【解决方案2】:

我建议使用这种解决方法来显示进程的异常

from multiprocessing import Process
import traceback


run_old = Process.run

def run_new(*args, **kwargs):
    try:
        run_old(*args, **kwargs)
    except (KeyboardInterrupt, SystemExit):
        raise
    except:
        traceback.print_exc(file=sys.stdout)

Process.run = run_new

【讨论】:

  • 简单,最佳答案
【解决方案3】:

这不是一个答案,只是一个扩展评论。请运行这个程序并告诉我们你得到了什么输出(如果有的话):

from multiprocessing import Process, Queue

class Worker(Process):

    def __init__(self, inputQueue, outputQueue):

        super(Worker, self).__init__()

        self.inputQueue = inputQueue
        self.outputQueue = outputQueue

    def run(self):

        for i in iter(self.inputQueue.get, 'STOP'):

            # (code that does stuff)

            1 / 0 # Dumb error

            # (more code that does stuff)

            self.outputQueue.put(result)

if __name__ == '__main__':
    inq, outq = Queue(), Queue()
    inq.put(1)
    inq.put('STOP')
    w = Worker(inq, outq)
    w.start()

我明白了:

% test.py
Process Worker-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/home/unutbu/pybin/test.py", line 21, in run
    1 / 0 # Dumb error
ZeroDivisionError: integer division or modulo by zero

我很惊讶(如果)你什么也没得到。

【讨论】:

  • 如果他在 POSIX 上一无所获,我会感到惊讶。但是在 Windows 上,或者在 IDLE 或 PyDev 中,或者如果父进程是一个 GUI 应用程序……我不准备打赌……
  • @unutbu 我什么都没得到。使用 64 位 Windows 和 IDLE。
  • @npo:好的,如果你从控制台运行它会发生什么?
猜你喜欢
  • 2016-11-29
  • 1970-01-01
  • 2015-08-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-07-01
  • 2018-05-06
相关资源
最近更新 更多