【问题标题】:multiprocessing.Queue doesn't work as instance variable in class?multiprocessing.Queue 不能用作类中的实例变量?
【发布时间】:2018-12-02 08:54:46
【问题描述】:

我想将一个多处理任务封装到一个类中。控制和工作函数都是该类的成员。工作人员使用Pool.map_async() 运行,因此可以在其他工作人员仍在运行时处理结果。处理结果存储在multiprocessing.Queue 中。 Queue是实例变量时不起作用,而全局变量或类变量则起作用。

例子:

import multiprocessing 
class A():
    # Queue as instance variable
    def __init__(self):
        self.qout = multiprocessing.Queue()
    def worker(self,x):
        self.qout.put(x*x)   
    def process(self):
        values = range(10)
        with multiprocessing.Pool() as pool:
            res = pool.map_async(self.worker,values)        
            while (not self.qout.empty() or
                not res.ready()):
                val = self.qout.get()
                print(val)

qoutB = multiprocessing.Queue()
class B():
    # Queue as global variable
    def __init__(self):
        pass   
    def worker(self,x):
        qoutB.put(x*x)       
    def process(self):
        values = range(10)       
        with multiprocessing.Pool() as pool:
            res = pool.map_async(self.worker,values)           
            while (not qoutB.empty() or
                not res.ready()):
                val = qoutB.get()
                print(val)

class C():
    # Queue as Class variable
    qout = multiprocessing.Queue()
    def __init__(self):
        pass
    def worker(self,x):
        self.qout.put(x*x)   
    def process(self):
        values = range(10)
        with multiprocessing.Pool() as pool:
            res = pool.map_async(self.worker,values)        
            while (not self.qout.empty() or
                not res.ready()):
                val = self.qout.get()
                print(val)  

现在,当您按如下方式调用类时(将其放在类定义下方)

a=A()
a.process()

不起作用(可能会停止等待self.qout.get(),但是

a=B()
a.process()

a=C()
a.process()

works(打印结果)。为什么?

我在Python documentation 中没有找到任何相关信息。我没有尝试将队列作为参数传递,但它是一个应该对用户隐藏的功能。

B 选项应该没有问题,C 并不理想,因为队列将在类的所有实例之间共享。

注意:这是在 Linux 上测试的(Debian,来自存储库的 Python 3.5)。

【问题讨论】:

  • A 类中,队列是一个实例属性,而不是一个其他的类变量——所以你使用了错误的术语。您应该尝试创建一个实际的类变量,看看会发生什么。
  • 抱歉术语有误,将解决问题。但是,如果队列是类变量,它会在所有实例之间进行分配,不是吗。这不是我们想要的行为。
  • 我明白了。如果没有其他问题,您应该尝试一下,看看是否存在相同的问题(即使您无法使用该技术)。
  • 好的,如果队列是类变量的话,它可以工作...
  • 这是在 Windows 上吗?另外——一如既往——“不起作用”是什么意思?

标签: python class queue multiprocessing


【解决方案1】:

SO 算法给了我有趣的提示,我之前找不到。

基于this answer,队列不能作为参数传递给正在打开新进程的函数,因为队列不能被腌制。这通常是self.function() 所做的:它相当于function(self)。对于A 类,尝试将队列传递给worker;在BC 中,它不是并且或多或少独立于流程

this question and answers 也得出同样的结论。不用说,manager.Queue 在这里也不起作用。

MCVE 测试失败

这可能是由于multiprocessing(see docs)的默认启动方式不同

【讨论】:

    【解决方案2】:

    同样,这不是您问题的答案。但是,我发布它是因为它使整个问题变得毫无意义——因为您实际上并不需要显式地创建和使用 multiprocessing.Queue 来执行此类操作。

    请考虑使用concurrent.futures.ProcessPoolExecutor 来完成任务。

    例如:

    import concurrent.futures
    
    class A_Prime():
        def __init__(self):
            pass
    
        def worker(self, x):
            return x*x
    
        def process(self):
            with concurrent.futures.ProcessPoolExecutor() as executor:
                classname = type(self).__name__
                print(classname, '- calling executor.map')
                res = [value for value in executor.map(self.worker, range(10))]
                print(classname, '- executor.map finished')
                print('  result:', res)
    
    
    if __name__ == '__main__':
        test = A_Prime()
        test.process()
        print('done')
    

    输出:

    A_Prime - calling executor.map
    A_Prime - executor.map finished
      result: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    done
    

    【讨论】:

    • 这可以异步工作吗?我试图摆弄队列的原因是,我的实际工作人员需要一些时间,而且人数众多,所以我想同时做一些事情。我必须测试更多multiprocessing.pool.AsyncResult
    • Hamiltonian:是的,我相信是的。链接文档的第一行说“ProcessPoolExecutor 类是 Executor 子类,它使用进程池异步执行调用。” (强调我的最后一句话)
    • 另外请注意,如果我的任何一个答案对您有所帮助,即使他们没有直接解决您的问题,您仍然可以对其进行投票。见What should I do when someone answers my question?
    • 好吧,Futures 确实很有趣。可惜你在谷歌搜索上找不到它。
    • 好的,我会投票赞成这个,但请接受我的回答,因为它解释了我相信的原因。无论如何感谢您的提示。
    猜你喜欢
    • 1970-01-01
    • 2017-09-26
    • 1970-01-01
    • 2011-07-02
    • 1970-01-01
    • 1970-01-01
    • 2022-01-05
    • 2014-09-21
    • 1970-01-01
    相关资源
    最近更新 更多