【问题标题】:Multiprocessing, what does pool.ready do?多处理,pool.ready 是做什么的?
【发布时间】:2016-08-05 21:22:22
【问题描述】:

假设我在一个类中有一个包含几个进程的池,用于进行一些处理,如下所示:

class MyClass:

    def __init_(self):
        self.pool = Pool(processes = NUM_PROCESSES)
        self.pop = []
        self.finished = []

    def gen_pop(self):
        self.pop = [ self.pool.apply_async(Item.test, (Item(),)) for _ in range(NUM_PROCESSES) ]
        while (not self.check()):
            continue
        # Do some other stuff

    def check(self):
        self.finished = filter(lambda t: self.pop[t].ready(), range(NUM_PROCESSES))
        new_pop = []
        for f in self.finished:
            new_pop.append(self.pop[f].get(timeout = 1))
            self.pop[f] = None
            # Do some other stuff

当我运行这段代码时,我得到一个cPickle.PicklingError,它指出<type 'function'> 不能被腌制。这告诉我的是 apply_async 函数之一尚未返回,因此我试图将正在运行的函数附加到另一个列表。但这不应该发生,因为应该使用 ready() 函数过滤掉所有正在运行的调用。

在相关说明中,Item 类的实际性质并不重要,但重要的是,在我的 Item.test 函数的顶部,我有一个 print 语句,它应该出于调试目的而触发。但是,这不会发生。这告诉我该函数已启动但尚未真正开始执行。

那么,ready() 似乎并没有真正告诉我调用是否已完成执行。 ready() 到底是做什么的,我应该如何编辑我的代码以便过滤掉仍在运行的进程?

【问题讨论】:

  • 你为什么认为cPickle.PicklingError是这个意思?
  • @zwol 我真的不知道。相反,我认为该错误是实际问题的症状,我的问题中对此进行了详细说明。
  • 我的意思是我认为你对实际问题的诊断是错误的。

标签: python multiprocessing pool


【解决方案1】:

Multiprocessing 在内部使用pickle 模块在进程之间传递数据, 所以你的数据必须是picklable。请参阅the list of what is considered picklable,对象方法不在该列表中。
要快速解决这个问题,只需在方法周围使用一个包装函数:

def wrap_item_test(item):
    item.test()

class MyClass:
    def gen_pop(self):
        self.pop = [ self.pool.apply_async(wrap_item_test, (Item(),)) for _ in range(NUM_PROCESSES) ]
        while (not self.check()):
            continue

【讨论】:

  • 那么,我是否将wrap_item_test 放在包含Item 的模块的顶部?或者还有什么我需要知道的?
【解决方案2】:

为了回答您提出的问题,.ready() 真的在告诉您.get() 是否可能 阻止:如果.ready() 返回True.get()不会 em> 块,但如果 .ready() 返回 False.get() 可能 块(或者它可能不会:异步调用很可能会在您调用 .get() 之前完成)。

因此,例如,.get() 中的 timeout = 1 没有任何作用:因为您仅在 .ready() 返回 True 时调用 .get(),所以您已经知道 .get() 不会阻塞.

.get() 不阻塞并不暗示异步调用成功,甚至工作进程甚至开始处理异步调用:正如文档所说,

如果远程调用引发异常,那么 get() 将重新引发该异常。

也就是说,例如,如果异步调用根本无法执行.ready() 将返回 True.get() 将(重新)引发阻止尝试的异常从工作中。

这似乎是你的情况,尽管我们不得不猜测,因为你没有发布可运行的代码,也没有包含回溯。

请注意,如果您真正想知道的是异步调用是否正常完成,在已经从.ready() 得到True 之后,那么.successful() 就是要调用的方法。

很明显,无论Item.test 是什么,由于pickle 限制,完全不可能将其作为可调用对象传递给.apply_async()。这就解释了为什么Item.test 从不打印任何东西(它实际上从未被调用!),为什么.ready() 返回True.apply_async() 调用失败),以及为什么.get() 引发异常(因为.apply_async() 遇到异常同时试图腌制它的一个论点——可能是Item.test)。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-10-07
    • 2010-09-24
    • 2013-08-13
    • 1970-01-01
    • 2013-11-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多