【发布时间】:2020-08-29 16:24:45
【问题描述】:
我希望如果我在实例方法中调用 apply_async 并获得其结果,那么所做的任何更改都将保留为分叉进程的一部分。但是,似乎每次对 apply_async 的新调用都会创建所述实例的新副本。
取以下代码:
from multiprocessing.pool import Pool
class Multitest:
def __init__(self):
self.i = 0
def run(self):
with Pool(2) as pool:
worker_jobs = []
for j in range(10):
job = pool.apply_async(self.process, (j,))
worker_jobs.append(job)
for job in worker_jobs:
res = job.get()
print("input", res)
def process(self, inp):
print("i", self.i)
self.i += 1
return inp
if __name__ == '__main__':
mt = Multitest()
mt.run()
样本输出:
i 0
i 0
i 0
i 0
i 0
input 0
i 0
i 0
i 0
i 0
i 0
input 1
input 2
input 3
input 4
input 5
input 6
input 7
input 8
input 9
但由于我们有两个核心,其中分布着 10 个输入,所以我预计 i 属性会增加。
我预计会出现以下流程:
- 主线程创建实例并调用
run() - 主线程通过初始化两个新进程和原始 Multitest 实例的副本(其中
i = 0)将apply_async的工作分配到池中 -
在新进程上多次调用
process()(直到range()用尽)。在每次调用进程时,该进程的self.i都会递增
注意:我不询问两个进程之间的共享状态。相反,我问的是为什么单个进程的类实例没有发生变异(为什么每个单独进程的 self.i 没有增加)。
但是,我没有看到这种行为。相反,打印输出只有零,表明我的预期是错误的:状态(属性i)没有被维护,但每次调用apply_async 时都会创建一个新实例(或至少一个新副本)。我在这里缺少什么,我怎样才能使这项工作按预期进行? (最好使用apply_async,虽然不是必需的。但应保持结果的顺序。)
据我所知,这种行为并非特定于 apply_async,也适用于其他 pool 方法。我有兴趣了解为什么会发生这种情况以及如何将行为更改为我想要实现的行为。赏金会找到可以同时回答这两个问题的答案。
【问题讨论】:
-
一般来说,您知道 Python 中的多处理是如何工作的吗?更重要的是,每个新进程是父进程的一个分支,它拥有自己的状态副本,而不是共享状态
-
@gold_cy 我是,但这与那不同。我不是在询问进程之间的共享状态,而是同一进程的一个类实例是否保持不变(具有相同的、已修改的属性)。
-
在多处理中,参数被腌制,转移到另一个进程,然后取消腌制。当调用
async_apply时,函数接收的是参数的副本。要在进程之间同步状态,请尝试使用multiprocessing.SyncManager之类的管理器或创建自己的管理器。或者创建代理对象multiprocessing.managers.BaseProxy。毕竟,您可能更喜欢根据结果更新实例。 :) -
@Aaron 请重新阅读帖子,尤其是注释。这个问题与进程之间的共享无关。
-
@BramVanroy 前两句话已经回答了原因。要实现您提到的行为,请考虑创建类似于
threading.local的进程本地存储。使用模块来存储进程状态,因为模块确实是进程本地的。multiprocess.Pool的参数initializer可能会有帮助。
标签: python oop python-multiprocessing