【问题标题】:Python process not cleaned for reusePython 进程未清理以供重用
【发布时间】:2018-11-30 21:28:45
【问题描述】:

流程未清理以供重复使用

你好,

我偶然发现了ProcessPoolExecutor 的问题,其中进程访问 数据,他们应该做不到。让我解释一下:

我遇到了类似于以下示例的情况:我有几次跑步要开始 每个都有不同的论据。他们并行计算他们的东西并且没有 互相交流的理由。现在,据我了解,当一个过程 叉子,它会自我复制。子进程具有相同的(内存)数据,如 它的父级,但如果它改变任何东西,它会在它自己的副本上这样做。如果我 希望更改能够在子进程的生命周期中存活下来,我会 调用队列、管道和其他 IPC 内容。

但实际上我没有!每个进程都为自己处理数据,这 不应延续到任何其他运行。下面的例子显示 否则,虽然。下一次运行(不是并行运行的)可以访问 他们之前运行的数据,暗示数据尚未被清理 从过程中。

代码/示例

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import current_process, set_start_method

class Static:
    integer: int = 0

def inprocess(run: int) -> None:
    cp = current_process()
    # Print current state
    print(f"[{run:2d} {cp.pid} {cp.name}] int: {Static.integer}", flush=True)

    # Check value
    if Static.integer != 0:
        raise Exception(f"[{run:2d} {cp.pid} {cp.name}] Variable already set!")

    # Update value
    Static.integer = run + 1

def pooling():
    cp = current_process()
    # Get master's pid
    print(f"[{cp.pid} {cp.name}] Start")
    with ProcessPoolExecutor(max_workers=2) as executor:
        for i, _ in enumerate(executor.map(inprocess, range(4))):
            print(f"run #{i} finished", flush=True)

if __name__ == '__main__':
    set_start_method("fork")    # enforce fork
    pooling()

输出

[1998 MainProcess] Start
[ 0 2020 Process-1] int: 0
[ 2 2020 Process-1] int: 1
[ 1 2021 Process-2] int: 0
[ 3 2021 Process-2] int: 2
run #0 finished
run #1 finished
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 175, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 153, in _process_chunk
    return [fn(*args) for args in chunk]
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 153, in <listcomp>
    return [fn(*args) for args in chunk]
  File "<stdin>", line 14, in inprocess
Exception: [ 2 2020 Process-1] Variable already set!
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<stdin>", line 29, in <module>
  File "<stdin>", line 24, in pooling
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 366, in _chain_from_iterable_of_lists
    for element in iterable:
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 586, in result_iterator
    yield fs.pop().result()
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
Exception: [ 2 2020 Process-1] Variable already set!

这个行为也可以用max_workers=1重现,因为这个过程是 重复使用。 start-method 对错误没有影响(尽管只有"fork" 似乎使用了多个进程)。


总结一下:我希望每次新运行都包含所有以前的数据,但是 没有来自任何其他运行的新数据。那可能吗?我将如何实现 它?为什么上面不完全做到这一点?

感谢您的帮助。


我找到multiprocessing.pool.Pool 可以设置maxtasksperchild=1,所以 工作进程在其任务完成时被销毁。但我不喜欢 multiprocessing接口; ProcessPoolExecutor 更舒服 采用。此外,池的整个想法是节省进程设置时间, 在每次运行后终止托管进程时,这将被解除。

【问题讨论】:

    标签: python python-3.x python-multiprocessing concurrent.futures


    【解决方案1】:

    python 中的全新进程不共享内存状态。但是ProcessPoolExecutor 重用流程实例。毕竟这是一个活动进程池。我认为这样做是为了防止操作系统一直在启动和启动进程。

    您会在 celery 等其他分发技术中看到相同的行为,如果您不小心,您可能会在执行之间泄露全局状态。

    我建议您更好地管理命名空间以封装数据。使用您的示例,您可以例如将代码和数据封装在您在inprocess() 中实例化的父类中,而不是将其存储在共享命名空间中,如类中的静态字段或直接存储在模块中。这样对象最终会被垃圾收集器清理掉:

    class State:
        def __init__(self):
            self.integer: int = 0
    
        def do_stuff():
            self.integer += 42
    
    def use_global_function(state):
        state.integer -= 1664
        state.do_stuff()
    
    def inprocess(run: int) -> None:
        cp = current_process()
        state = State()
        print(f"[{run:2d} {cp.pid} {cp.name}] int: {state.integer}", flush=True)
        if state.integer != 0:
            raise Exception(f"[{run:2d} {cp.pid} {cp.name}] Variable already set!")
        state.integer = run + 1
        state.do_stuff()
        use_global_function(state)
    

    【讨论】:

    • 谢谢。关于流程重用:我假设相同。关于您的建议:不幸的是,这对我不起作用。我必须找到一个例子,没有所有的实际开销。我依靠元类来减少我的 api,因此,我无法在 inprocess 调用中实例化这些东西。
    【解决方案2】:

    我一直面临一些可能类似的问题,并在High Memory Usage Using Python Multiprocessing 中看到了一些有趣的帖子,这些帖子指向使用 gc.collector(),但是在您的情况下它没有用。于是想到了Static类是怎么初始化的,有几点:

    1. 很遗憾,我无法重现您的最小示例值错误提示: ValueError:找不到“fork”的上下文
    2. 考虑到 1,我使用 set_start_method("spawn") 一个快速的解决方法是每次初始化静态类,如下所示:
    {
        class Static:
            integer: int = 0
            def __init__(self):
                pass
        
        def inprocess(run: int) -> None:
            cp = current_process()
            # Print current state
            print(f"[{run:2d} {cp.pid} {cp.name}] int: {Static().integer}", flush=True)
        
            # Check value
            if Static().integer != 0:
                raise Exception(f"[{run:2d} {cp.pid} {cp.name}] Variable already set!")
        
            # Update value
            Static().integer = run + 1
        
        
        def pooling():
            cp = current_process()
            # Get master's pid
            print(f"[{cp.pid} {cp.name}] Start")
            with ProcessPoolExecutor(max_workers=2) as executor:
                for i, _ in enumerate(executor.map(inprocess, range(4))):
                    print(f"run #{i} finished", flush=True)
        
        
        if __name__ == "__main__":
            print("start")
            # set_start_method("fork")  # enforce fork , ValueError: cannot find context for 'fork'
            set_start_method("spawn")    # Alternative
            pooling()
    }
    

    这会返回:

    [ 0 1424 SpawnProcess-2] int: 0
    [ 1 1424 SpawnProcess-2] int: 0
    run #0 finished
    [ 2 17956 SpawnProcess-1] int: 0
    [ 3 1424 SpawnProcess-2] int: 0
    run #1 finished
    run #2 finished
    run #3 finished
    

    【讨论】:

      猜你喜欢
      • 2016-02-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-06-29
      • 1970-01-01
      相关资源
      最近更新 更多