【发布时间】:2018-11-30 21:28:45
【问题描述】:
流程未清理以供重复使用
你好,
我偶然发现了ProcessPoolExecutor 的问题,其中进程访问
数据,他们应该做不到。让我解释一下:
我遇到了类似于以下示例的情况:我有几次跑步要开始 每个都有不同的论据。他们并行计算他们的东西并且没有 互相交流的理由。现在,据我了解,当一个过程 叉子,它会自我复制。子进程具有相同的(内存)数据,如 它的父级,但如果它改变任何东西,它会在它自己的副本上这样做。如果我 希望更改能够在子进程的生命周期中存活下来,我会 调用队列、管道和其他 IPC 内容。
但实际上我没有!每个进程都为自己处理数据,这 不应延续到任何其他运行。下面的例子显示 否则,虽然。下一次运行(不是并行运行的)可以访问 他们之前运行的数据,暗示数据尚未被清理 从过程中。
代码/示例
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import current_process, set_start_method
class Static:
integer: int = 0
def inprocess(run: int) -> None:
cp = current_process()
# Print current state
print(f"[{run:2d} {cp.pid} {cp.name}] int: {Static.integer}", flush=True)
# Check value
if Static.integer != 0:
raise Exception(f"[{run:2d} {cp.pid} {cp.name}] Variable already set!")
# Update value
Static.integer = run + 1
def pooling():
cp = current_process()
# Get master's pid
print(f"[{cp.pid} {cp.name}] Start")
with ProcessPoolExecutor(max_workers=2) as executor:
for i, _ in enumerate(executor.map(inprocess, range(4))):
print(f"run #{i} finished", flush=True)
if __name__ == '__main__':
set_start_method("fork") # enforce fork
pooling()
输出
[1998 MainProcess] Start
[ 0 2020 Process-1] int: 0
[ 2 2020 Process-1] int: 1
[ 1 2021 Process-2] int: 0
[ 3 2021 Process-2] int: 2
run #0 finished
run #1 finished
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.6/concurrent/futures/process.py", line 175, in _process_worker
r = call_item.fn(*call_item.args, **call_item.kwargs)
File "/usr/lib/python3.6/concurrent/futures/process.py", line 153, in _process_chunk
return [fn(*args) for args in chunk]
File "/usr/lib/python3.6/concurrent/futures/process.py", line 153, in <listcomp>
return [fn(*args) for args in chunk]
File "<stdin>", line 14, in inprocess
Exception: [ 2 2020 Process-1] Variable already set!
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "<stdin>", line 29, in <module>
File "<stdin>", line 24, in pooling
File "/usr/lib/python3.6/concurrent/futures/process.py", line 366, in _chain_from_iterable_of_lists
for element in iterable:
File "/usr/lib/python3.6/concurrent/futures/_base.py", line 586, in result_iterator
yield fs.pop().result()
File "/usr/lib/python3.6/concurrent/futures/_base.py", line 425, in result
return self.__get_result()
File "/usr/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
Exception: [ 2 2020 Process-1] Variable already set!
这个行为也可以用max_workers=1重现,因为这个过程是
重复使用。 start-method 对错误没有影响(尽管只有"fork"
似乎使用了多个进程)。
总结一下:我希望每次新运行都包含所有以前的数据,但是 没有来自任何其他运行的新数据。那可能吗?我将如何实现 它?为什么上面不完全做到这一点?
感谢您的帮助。
我找到multiprocessing.pool.Pool 可以设置maxtasksperchild=1,所以
工作进程在其任务完成时被销毁。但我不喜欢
multiprocessing接口; ProcessPoolExecutor 更舒服
采用。此外,池的整个想法是节省进程设置时间,
在每次运行后终止托管进程时,这将被解除。
【问题讨论】:
标签: python python-3.x python-multiprocessing concurrent.futures