【问题标题】:使用多处理池的 apply_async 方法时谁运行回调?
【发布时间】:2014-09-06 09:36:11
【问题描述】:

在使用多处理池的 apply_sync 方法时,我试图了解幕后发生的事情。

谁运行回调方法?是调用apply_async的主进程吗?

假设我发送了一大堆带有回调的 apply_async 命令,然后继续执行我的程序。当 apply_async 开始到结束时,我的程序仍在做事。当主进程仍在忙于脚本时,回调如何运行我的“主进程”?

这是一个例子。

import multiprocessing
import time

def callback(x):
    print '{} running callback with arg {}'.format(multiprocessing.current_process().name, x)

def func(x):
    print '{} running func with arg {}'.format(multiprocessing.current_process().name, x)
    return x

pool = multiprocessing.Pool()

args = range(20)

for a in args:
    pool.apply_async(func, (a,), callback=callback)

print '{} going to sleep for a minute'.format(multiprocessing.current_process().name)

t0 = time.time()
while time.time() - t0 < 60:
    pass

print 'Finished with the script'

输出类似于

PoolWorker-1 使用 arg 0 运行 func

PoolWorker-2 使用 arg 1 运行 func

PoolWorker-3 使用 arg 2 运行 func

MainProcess 将要休眠一分钟

PoolWorker-4 使用 arg 3 运行 func

PoolWorker-1 使用 arg 4 运行 func

PoolWorker-2 使用 arg 5 运行 func

PoolWorker-3 使用 arg 6 运行 func

PoolWorker-4 使用 arg 7 运行 func

MainProcess 运行回调,参数为 0

MainProcess 使用 arg 1 运行回调

MainProcess 使用 arg 2 运行回调

MainProcess 使用 arg 3 运行回调

MainProcess 使用 arg 4 运行回调

PoolWorker-1 使用 arg 8 运行 func

...

完成脚本

MainProcess 如何在 while 循环中间运行回调??

multiprocessing.Pool 的文档中有关于回调的声明,这似乎是一个提示,但我不明白。

apply_async(func[, args[, kwds[, callback]]])

apply() 方法的变体,它返回一个结果对象。

如果指定了回调,那么它应该是一个接受单个参数的可调用对象。当结果准备就绪时,将对其应用回调(除非调用失败)。回调应该立即完成,否则处理结果的线程将被阻塞。

【问题讨论】:

    标签: python callback parallel-processing multiprocessing


    【解决方案1】:

    文档中确实有提示:

    回调应该立即完成,因为 否则线程 处理结果将被阻止。

    回调在主进程中处理,但它们在自己的单独线程中运行。当您创建 Pool 时,它实际上会在内部创建一些 Thread 对象:

    class Pool(object):
        Process = Process
    
        def __init__(self, processes=None, initializer=None, initargs=(),
                     maxtasksperchild=None):
            self._setup_queues()
            self._taskqueue = Queue.Queue()
            self._cache = {}
            ... # stuff we don't care about
            self._worker_handler = threading.Thread(
                target=Pool._handle_workers,
                args=(self, )
                )
            self._worker_handler.daemon = True
            self._worker_handler._state = RUN 
            self._worker_handler.start()
    
            self._task_handler = threading.Thread(
                target=Pool._handle_tasks,
                args=(self._taskqueue, self._quick_put, self._outqueue,
                      self._pool, self._cache)
                )
            self._task_handler.daemon = True
            self._task_handler._state = RUN 
            self._task_handler.start()
    
            self._result_handler = threading.Thread(
                target=Pool._handle_results,
                args=(self._outqueue, self._quick_get, self._cache)
                )
            self._result_handler.daemon = True
            self._result_handler._state = RUN
            self._result_handler.start()
    

    对我们来说有趣的话题是_result_handler;我们很快就会知道原因。

    切换一秒钟,当您运行 apply_async 时,它会在内部创建一个 ApplyResult 对象来管理从子节点获取结果:

    def apply_async(self, func, args=(), kwds={}, callback=None):
        assert self._state == RUN
        result = ApplyResult(self._cache, callback)
        self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
        return result
    
    class ApplyResult(object):
    
        def __init__(self, cache, callback):
            self._cond = threading.Condition(threading.Lock())
            self._job = job_counter.next()
            self._cache = cache
            self._ready = False
            self._callback = callback
            cache[self._job] = self
    
    
        def _set(self, i, obj):
            self._success, self._value = obj
            if self._callback and self._success:
                self._callback(self._value)
            self._cond.acquire()
            try:
                self._ready = True
                self._cond.notify()
            finally:
                self._cond.release()
            del self._cache[self._job]
    

    如您所见,假设任务成功,_set 方法最终实际执行传入的callback。另请注意,它会将自身添加到 __init__ 末尾的全局 cache 字典中。

    现在,回到_result_handler 线程对象。该对象调用_handle_results 函数,如下所示:

        while 1:
            try:
                task = get()
            except (IOError, EOFError):
                debug('result handler got EOFError/IOError -- exiting')
                return
    
            if thread._state:
                assert thread._state == TERMINATE
                debug('result handler found thread._state=TERMINATE')
                break
    
            if task is None:
                debug('result handler got sentinel')
                break
    
            job, i, obj = task
            try:
                cache[job]._set(i, obj)  # Here is _set (and therefore our callback) being called!
            except KeyError:
                pass
    
            # More stuff
    

    这是一个循环,它只是从队列中提取子节点的结果,在cache 中找到它的条目,然后调用_set,它会执行我们的回调。即使您处于循环中,它也可以运行,因为它没有在主线程中运行。

    【讨论】:

    • 感谢 Dano 抽出宝贵时间写出如此详细的回复!如果我理解正确,池会创建一个 single 新线程(result_handler),其工作就是等待 apply_async 完成,然后调用 result_handler 线程中的回调(它是主进程)。回调(对于单个池对象)是否会按顺序调用? IE。一堆 apply_async 可能一起完成,但回调将由 result_handler 串行运行?
    • 还有一个问题。如果回调函数和主脚本都与相同的对象(在 MainProcess 中)混淆怎么办?会不会有不可预知的行为? IE。如果回调和主脚本中稍后的内容都尝试写入同一个文件或修改同一个数组。当回调实际运行时,谁知道那时主脚本会做什么。
    • @Alex 是的,回调将按顺序执行。 _result_handler 线程将一个已完成的任务从队列中拉出,调用 _set(它运行回调),然后继续执行下一个任务。这就是为什么文档说要确保回调立即完成;执行回调会阻止处理其他结果。
    • @Alex 您绝对需要担心您在回调中更改的任何对象的线程安全。一般来说,我建议在回调中尽可能少做,但如果你绝对需要接触共享状态,你必须用某种互斥锁来保护它。
    猜你喜欢
    • 1970-01-01
    • 2020-10-30
    • 1970-01-01
    • 2018-11-23
    • 2013-01-26
    • 1970-01-01
    • 2021-07-14
    • 2021-06-20
    • 1970-01-01
    相关资源
    最近更新 更多