【问题标题】:How to fork and join multiple subprocesses with a global timeout in Python?如何在 Python 中使用全局超时分叉和加入多个子进程?
【发布时间】:2019-06-12 13:12:56
【问题描述】:

我想在多个子流程中并行执行一些任务,如果任务没有在一定延迟内完成,则超时。

第一种方法是分叉和连接子进程单独,剩余超时根据全局超时计算,如this answer 中建议的那样。对我来说效果很好。

我想在这里使用的第二种方法是创建一个子进程池并等待全局超时,如this answer 中所建议的那样。

但是我对第二种方法有一个问题:在向子进程池提供具有 multiprocessing.Event() 对象的任务后,等待它们完成会引发此异常:

RuntimeError: Condition 对象只能通过继承在进程之间共享

这里是 Python 代码 sn-p:

import multiprocessing.pool
import time


class Worker:

    def __init__(self):
        self.event = multiprocessing.Event()  # commenting this removes the RuntimeError

    def work(self, x):
        time.sleep(1)
        return x * 10


if __name__ == "__main__":
    pool_size = 2
    timeout = 5

    with multiprocessing.pool.Pool(pool_size) as pool:
        result = pool.map_async(Worker().work, [4, 5, 2, 7])
        print(result.get(timeout))  # raises the RuntimeError

【问题讨论】:

    标签: python multiprocessing timeout pool


    【解决方案1】:

    multiprocessing — 基于进程的并行性 文档的 "Programming guidlines" 部分中,有这样一段:

    比pickle/unpickle更好继承

    当使用 spawnforkserver 启动方法时,multiprocessing 中的许多类型需要是可挑选的,以便子进程可以使用它们。但是,通常应该避免使用管道或队列将共享对象发送到其他进程。相反,您应该安排程序,以便需要访问在其他地方创建的共享资源的进程可以从祖先进程继承它。

    所以multiprocessing.Event() 导致RuntimeError 因为它不可拾取,如以下 Python 代码 sn-p 所示:

    import multiprocessing
    import pickle
    
    pickle.dumps(multiprocessing.Event())
    

    引发同样的异常:

    RuntimeError: Condition 对象只能通过继承在进程之间共享

    解决方案是使用proxy object:

    代理是指一个共享对象的对象,该对象(可能)存在于不同的进程中。

    因为:

    代理对象的一个​​重要特性是它们是可挑选的,因此它们可以在进程之间传递。

    multiprocessing.Manager().Event() 创建一个共享的threading.Event() 对象并为其返回一个代理,因此替换此行:

    self.event = multiprocessing.Event()
    

    通过问题的Python代码sn-p中的以下行解决了问题:

    self.event = multiprocessing.Manager().Event()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2023-04-04
      • 2017-12-28
      • 2011-02-13
      • 2016-11-16
      • 2012-02-27
      • 2017-01-28
      相关资源
      最近更新 更多