【问题标题】:Running a ProcessPoolExecutor in IPython在 IPython 中运行 ProcessPoolExecutor
【发布时间】:2020-05-18 00:54:21
【问题描述】:

我在 MacBook 上的 IPython 解释器(IPython 7.9.0、Python 3.8.0)中运行了一个简单的多处理示例,但遇到了一个奇怪的错误。这是我输入的内容:

[In [1]: from concurrent.futures import ProcessPoolExecutor

[In [2]: executor=ProcessPoolExecutor(max_workers=1)

[In [3]: def func():
             print('Hello')

[In [4]: future=executor.submit(func)

但是,我收到以下错误:

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 313, in _bootstrap
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)                                   
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
    call_item = call_queue.get(block=True)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 116, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'func' on <module '__main__' (built-in)>

此外,再次尝试提交作业给了我一个不同的错误:

[In [5]: future=executor.submit(func)                                            
---------------------------------------------------------------------------
BrokenProcessPool                         Traceback (most recent call last)
<ipython-input-5-42bad1a6fe80> in <module>
----> 1 future=executor.submit(func)

/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py in submit(*args, **kwargs)
    627         with self._shutdown_lock:
    628             if self._broken:
--> 629                 raise BrokenProcessPool(self._broken)
    630             if self._shutdown_thread:
    631                 raise RuntimeError('cannot schedule new futures after shutdown')

BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore

作为完整性检查,我将相同的(几乎)代码输入到 Python 文件中,并从命令行 (python3 test.py) 运行它。效果很好。

为什么 IPython 对我的测试有问题?

编辑:

这是运行良好的 Python 文件。

from concurrent.futures import ProcessPoolExecutor as Executor

def func():
        print('Hello')

if __name__ == '__main__':
        with Executor(1) as executor:
                future=executor.submit(func)
                print(future.result())

【问题讨论】:

  • 你的环境是什么?我在 Ubuntu 上的 ipython (7.14) 中运行了你的代码,它运行良好。我知道与 Windows 相关的多处理问题,但我没有要测试的 Windows 机器。如果您在 Windows 上运行,请将其添加到问题中,因为它可能是相关的。
  • 我在 MacBook 上运行。我已将其添加到 OP 中。
  • 我刚刚将 iPython 升级到 7.14 并再次运行它。同样的错误。

标签: python multiprocessing ipython


【解决方案1】:

好的,终于知道是怎么回事了。问题是 Mac OS - 它默认使用“spawn”方法来创建子进程。这里解释了https://docs.python.org/3/library/multiprocessing.html 以及将其更改为 fork 的方法(尽管它指出 fork 在 Mac os 上是不安全的)。

使用 spawn 方法会启动一个新的 Python 解释器,并将您的代码提供给它。然后尝试在 main 下找到您的函数,但在这种情况下,没有 main,因为没有程序,只是解释了命令。

如果您将 start 方法更改为 fork,您的代码会运行(但请注意这是不安全的)

In [1]: import multiprocessing as mp                                                                                     

In [2]: mp.set_start_method("fork")                                                                                      

In [3]: def func(): 
   ...:     print("foo"); 
   ...:                                               

In [4]: from concurrent.futures import ProcessPoolExecutor                                                               

In [5]: executor=ProcessPoolExecutor(max_workers=1)                                                               

In [6]: future=executor.submit(func)                                                                                     

foo
In [7]:  

由于警告,我不确定答案是否有帮助,但它解释了为什么当你有一个程序(你的其他尝试)时它的行为会有所不同,以及为什么它在 Ubuntu 上运行良好 - 它默认使用“fork”。

【讨论】:

  • 啊哈!做到了!谢谢!
  • 只是想多一点......当使用 fork 方法时,事情的顺序(可能)很重要。如果您在创建子流程后定义 func(),则子流程可能没有继承它,您将再次遇到异常。在我的示例中,出于这个原因,我直观地做了我一直对多处理和池所做的事情 - 首先定义函数并稍后声明池,但我意识到在你的 OP 中你有不同的顺序。它可能工作也可能不工作,不确定 ProcessPoolExecutor 是否在池声明或作业提交时启动进程。
【解决方案2】:

TLDR;

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor

# create child processes using 'fork' context
executor = ProcessPoolExecutor(max_workers=1, mp_context=mp.get_context('fork'))

这实际上是由 MacOS 上的 python 3.8 切换到“spawn”方法来创建子进程造成的;与 3.8 之前的默认“fork”相反。以下是一些本质区别:

分叉:

  • 克隆父进程的数据和代码,从而继承父程序的状态。
  • 子进程对继承变量所做的任何修改都不会反映父进程中这些变量的状态。从这一点开始,这些状态基本上是分叉的(写时复制)。
  • 在父进程中导入的所有库都可以在子进程的上下文中使用。这也使此方法速度更快,因为子进程不必重新导入库(代码)和变量(数据)。
  • 这有一些缺点,尤其是在派生多线程程序方面。
  • 某些具有 C 后端的库(例如 Tensorflow、OpenCV 等)不是 fork 安全的,会导致子进程以不确定的方式挂起。

产卵:

  • 为子进程创建一个全新的解释器,而不继承代码或数据。
  • 只有必要的数据/参数被发送到子进程。这意味着子进程不会自动使用变量、线程锁、文件描述符等——这避免了难以捕获的错误。
  • 这种方法也有一些缺点——因为数据/参数需要发送到子进程,它们也必须是可腌制的。某些具有内部锁/互斥锁的对象(例如队列)不能腌制,并且腌制较重的对象(例如数据帧和大型 numpy 数组)很昂贵。
  • 在子进程上取消腌制对象将导致重新导入关联的库(如果有)。这又增加了时间。
  • 由于没有将父代码克隆到子进程中,因此您需要在创建子进程时使用if __name__ == '__main__' 保护。不这样做会使子进程无法从父进程(现在作为 ma​​in 运行)导入代码。这也是为什么您的程序在与警卫一起使用时可以正常工作的原因。

如果您注意到 fork 会带来一些由您的程序或导入的非 fork 安全库引起的不可预知的影响,您可以:

  • (a) 全局设置多处理上下文以使用“fork”方法:
import multiprocessing as mp

mp.set_start_method("fork")

请注意,这将全局设置上下文,一旦设置,您或任何其他导入的库将无法更改此上下文。

  • (b) 使用多处理的get_context 方法在本地设置上下文:
import multiprocessing as mp
mp_fork = mp.get_context('fork')

# mp_fork has all the attributes of mp so you can do:
mp_fork.Process(...)  
mp_fork.Pool(...)

# using local context will not change global behaviour:
# create child process using global context
# default is fork in < 3.8; spawn otherwise
mp.Process(...)

# most multiprocessing based functionality like ProcessPoolExecutor 
# also take context as an argument:
executor=ProcessPoolExecutor(max_workers=1, mp_context=mp_fork)

【讨论】:

  • import multiprocessing as mp mp.set_start_method("fork") 这适用于 Python 3.6 吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-06-09
  • 2018-04-20
  • 1970-01-01
  • 2012-07-29
  • 1970-01-01
  • 1970-01-01
  • 2019-10-28
相关资源
最近更新 更多