【问题标题】:Context managers and multiprocessing pools上下文管理器和多处理池
【发布时间】:2014-07-12 21:25:25
【问题描述】:

假设您正在使用multiprocessing.Pool 对象,并且您正在使用构造函数的initializer 设置来传递一个初始化函数,然后该函数会在全局命名空间中创建一个资源。假设资源有一个上下文管理器。如果上下文管理的资源必须在整个进程的生命周期中存在,但在最后被适当地清理,您将如何处理它的生命周期?

到目前为止,我有一些类似的东西:

resource_cm = None
resource = None


def _worker_init(args):
    global resource
    resource_cm = open_resource(args)
    resource = resource_cm.__enter__()

从这里开始,池进程可以使用该资源。到现在为止还挺好。但是处理清理有点棘手,因为multiprocessing.Pool 类不提供destructordeinitializer 参数。

我的一个想法是使用atexit 模块,并在初始化程序中注册清理。像这样的:

def _worker_init(args):
    global resource
    resource_cm = open_resource(args)
    resource = resource_cm.__enter__()

    def _clean_up():
        resource_cm.__exit__()

    import atexit
    atexit.register(_clean_up)

这是一个好方法吗?有没有更简单的方法?

编辑:atexit 似乎不起作用。至少不是我上面使用它的方式,所以到目前为止我还没有解决这个问题的方法。

【问题讨论】:

    标签: python multiprocessing contextmanager


    【解决方案1】:

    首先,这是一个非常好的问题!在multiprocessing 代码中挖掘了一下之后,我想我找到了一种方法:

    当您启动multiprocessing.Pool 时,Pool 对象在内部为池的每个成员创建一个multiprocessing.Process 对象。当这些子进程启动时,它们会调用_bootstrap 函数,如下所示:

    def _bootstrap(self):
        from . import util
        global _current_process
        try:
            # ... (stuff we don't care about)
            util._finalizer_registry.clear()
            util._run_after_forkers()
            util.info('child process calling self.run()')
            try:
                self.run()
                exitcode = 0 
            finally:
                util._exit_function()
            # ... (more stuff we don't care about)
    

    run 方法是实际运行 target 您给 Process 对象的方法。对于Pool 进程,这是一种具有长时间运行的while 循环的方法,该循环等待工作项通过内部队列进入。我们真正感兴趣的是在self.run: util._exit_function() 被调用之后发生了什么。

    事实证明,该函数做了一些清理工作,听起来很像您正在寻找的内容:

    def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
                       active_children=active_children,
                       current_process=current_process):
        # NB: we hold on to references to functions in the arglist due to the
        # situation described below, where this function is called after this
        # module's globals are destroyed.
    
        global _exiting
    
        info('process shutting down')
        debug('running all "atexit" finalizers with priority >= 0')  # Very interesting!
        _run_finalizers(0)
    

    这是_run_finalizers的文档字符串:

    def _run_finalizers(minpriority=None):
        '''
        Run all finalizers whose exit priority is not None and at least minpriority
    
        Finalizers with highest priority are called first; finalizers with
        the same priority will be called in reverse order of creation.
        '''
    

    该方法实际上会遍历终结器回调列表并执行它们:

    items = [x for x in _finalizer_registry.items() if f(x)]
    items.sort(reverse=True)
    
    for key, finalizer in items:
        sub_debug('calling %s', finalizer)
        try:
            finalizer()
        except Exception:
            import traceback
            traceback.print_exc()
    

    完美。那么我们如何进入_finalizer_registry呢? multiprocessing.util 中有一个名为 Finalize 的未记录对象,它负责向注册表添加回调:

    class Finalize(object):
        '''
        Class which supports object finalization using weakrefs
        '''
        def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
            assert exitpriority is None or type(exitpriority) is int
    
            if obj is not None:
                self._weakref = weakref.ref(obj, self)
            else:
                assert exitpriority is not None
    
            self._callback = callback
            self._args = args
            self._kwargs = kwargs or {}
            self._key = (exitpriority, _finalizer_counter.next())
            self._pid = os.getpid()
    
            _finalizer_registry[self._key] = self  # That's what we're looking for!
    

    好的,所以把它们放在一个例子中:

    import multiprocessing
    from multiprocessing.util import Finalize
    
    resource_cm = None
    resource = None
    
    class Resource(object):
        def __init__(self, args):
            self.args = args
    
        def __enter__(self):
            print("in __enter__ of %s" % multiprocessing.current_process())
            return self
    
        def __exit__(self, *args, **kwargs):
            print("in __exit__ of %s" % multiprocessing.current_process())
    
    def open_resource(args):
        return Resource(args)
    
    def _worker_init(args):
        global resource
        print("calling init")
        resource_cm = open_resource(args)
        resource = resource_cm.__enter__()
        # Register a finalizer
        Finalize(resource, resource.__exit__, exitpriority=16)
    
    def hi(*args):
        print("we're in the worker")
    
    if __name__ == "__main__":
        pool = multiprocessing.Pool(initializer=_worker_init, initargs=("abc",))
        pool.map(hi, range(pool._processes))
        pool.close()
        pool.join()
    

    输出:

    calling init
    in __enter__ of <Process(PoolWorker-1, started daemon)>
    calling init
    calling init
    in __enter__ of <Process(PoolWorker-2, started daemon)>
    in __enter__ of <Process(PoolWorker-3, started daemon)>
    calling init
    in __enter__ of <Process(PoolWorker-4, started daemon)>
    we're in the worker
    we're in the worker
    we're in the worker
    we're in the worker
    in __exit__ of <Process(PoolWorker-1, started daemon)>
    in __exit__ of <Process(PoolWorker-2, started daemon)>
    in __exit__ of <Process(PoolWorker-3, started daemon)>
    in __exit__ of <Process(PoolWorker-4, started daemon)>
    

    如您所见,当我们 join() 池时,__exit__ 会在我们所有的工作人员中被调用。

    【讨论】:

    • 您能解释一下如何确定exitpriority 的值吗?即在你的情况下是exitpriority=16
    【解决方案2】:

    您可以继承Process 并覆盖其run() 方法,以便它在退出前执行清理。然后你应该继承 Pool 以便它使用你的子类进程:

    from multiprocessing import Process
    from multiprocessing.pool import Pool
    
    class SafeProcess(Process):
        """ Process that will cleanup before exit """
        def run(self, *args, **kw):
            result = super().run(*args, **kw)
            # cleanup however you want here
            return result
    
    
    class SafePool(Pool):
        Process = SafeProcess
    
    
    pool = SafePool(4)  # use it as standard Pool
    

    【讨论】:

    • @nattster 我比前一种方法更喜欢这种方法,尽管它使用了内置机制,因为它很优雅。但是当我尝试它时,我在class SafePool(Pool): 行得到一个TypeError: method expected 2 arguments, got 3。有什么想法吗?
    • @MilindR python 版本?我用的是 Python 2。
    • @MilindR,我想我知道你为什么会收到这个错误。在这里查看我的答案:stackoverflow.com/a/70572544/3158248。基本上multiprocessing.Pool 是一个函数,因此您需要将multiprocessing.pool.Pool 子类化。
    【解决方案3】:

    这是我想出的解决方案。它使用billiard,这是 Python 的多处理包的一个分支。此解决方案需要使用私有 API Worker._ensure_messages_consumed,因此我建议在生产中使用此解决方案。我只是需要这个来做一个附带项目,所以这对我来说已经足够了。使用此功能需您自担风险。

    from billiard import pool
    from billiard.pool import Pool, Worker
    
    class SafeWorker(Worker):
        # this function is called just before a worker process exits
        def _ensure_messages_consumed(self, *args, **kwargs):
            # Not necessary, but you can move `Pool.initializer` logic here if you want.
            out = super()._ensure_messages_consumed(*args, **kwargs)
            # Do clean up work here
            return out
    
    class SafePool(Pool):
        Worker = SafeWorker
    
    

    我尝试的另一个解决方案是将我的清理逻辑实现为信号处理程序,但这不起作用,因为multiprocessingbilliard 都使用exit() 来杀死他们的工作进程。我不确定atexit 是如何工作的,但这可能是该方法也不起作用的原因。

    【讨论】:

      猜你喜欢
      • 2021-07-09
      • 2020-06-07
      • 2018-12-06
      • 1970-01-01
      • 2016-05-30
      • 2018-08-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多