【问题标题】:Maintain instance state in multiprocessing apply_async在多处理 apply_async 中维护实例状态
【发布时间】:2020-08-29 16:24:45
【问题描述】:

我希望如果我在实例方法中调用 apply_async 并获得其结果,那么所做的任何更改都将保留为分叉进程的一部分。但是,似乎每次对 apply_async 的新调用都会创建所述实例的新副本。

取以下代码:

from multiprocessing.pool import Pool


class Multitest:
    def __init__(self):
        self.i = 0

    def run(self):
        with Pool(2) as pool:
            worker_jobs = []
            for j in range(10):
                job = pool.apply_async(self.process, (j,))
                worker_jobs.append(job)

            for job in worker_jobs:
                res = job.get()
                print("input", res)

    def process(self, inp):
        print("i", self.i)
        self.i += 1

        return inp

if __name__ == '__main__':
    mt = Multitest()
    mt.run()

样本输出:

i 0
i 0
i 0
i 0
i 0
input 0
i 0
i 0
i 0
i 0
i 0
input 1
input 2
input 3
input 4
input 5
input 6
input 7
input 8
input 9

但由于我们有两个核心,其中分布着 10 个输入,所以我预计 i 属性会增加。

我预计会出现以下流程:

  • 主线程创建实例并调用run()
  • 主线程通过初始化两个新进程和原始 Multitest 实例的副本(其中i = 0)将apply_async 的工作分配到池中
  • 在新进程上多次调用process()(直到range()用尽)。在每次调用进程时,该进程的self.i 都会递增

注意:我询问两个进程之间的共享状态。相反,我问的是为什么单个进程的类实例没有发生变异(为什么每个单独进程的 self.i 没有增加)。

但是,我没有看到这种行为。相反,打印输出只有零,表明我的预期是错误的:状态(属性i)没有被维护,但每次调用apply_async 时都会创建一个新实例(或至少一个新副本)。我在这里缺少什么,我怎样才能使这项工作按预期进行? (最好使用apply_async,虽然不是必需的。但应保持结果的顺序。)

据我所知,这种行为并非特定于 apply_async,也适用于其他 pool 方法。我有兴趣了解为什么会发生这种情况以及如何将行为更改为我想要实现的行为。赏金会找到可以同时回答这两个问题的答案。

【问题讨论】:

  • 一般来说,您知道 Python 中的多处理是如何工作的吗?更重要的是,每个新进程是父进程的一个分支,它拥有自己的状态副本,而不是共享状态
  • @gold_cy 我是,但这与那不同。我不是在询问进程之间的共享状态,而是同一进程的一个类实例是否保持不变(具有相同的、已修改的属性)。
  • 在多处理中,参数被腌制,转移到另一个进程,然后取消腌制。当调用async_apply 时,函数接收的是参数的副本。要在进程之间同步状态,请尝试使用 multiprocessing.SyncManager 之类的管理器或创建自己的管理器。或者创建代理对象multiprocessing.managers.BaseProxy。毕竟,您可能更喜欢根据结果更新实例。 :)
  • @Aaron 请重新阅读帖子,尤其是注释。这个问题与进程之间的共享无关。
  • @BramVanroy 前两句话已经回答了原因。要实现您提到的行为,请考虑创建类似于threading.local 的进程本地存储。使用模块来存储进程状态,因为模块确实是进程本地的。 multiprocess.Pool 的参数initializer 可能会有帮助。

标签: python oop python-multiprocessing


【解决方案1】:

我想向您指出参考资料,但我还没有,所以我将根据经验证据分享我的想法:

每次调用 apply_async 都会准备一个新的命名空间副本。您可以通过在进程内添加对print(self) 的调用来查看这一点。所以这部分是不正确的:

主线程分配工作......通过初始化两个新进程和 原始 Multitest 实例的副本

相反,有两个新进程和 10 个原始 Multitest 实例的副本。所有这些副本都是从主进程制作的,它的 i 副本没有增加。为了证明这一点,在调用apply_async之前添加time.sleep(1); self.i += 1,并注意a)主线程中i的值增加,b)通过延迟for循环,原来的Multitest实例在下一次的时候已经改变了调用 apply_async 会触发一个新副本。

代码:

from multiprocessing.pool import Pool
import time

class Multitest:
    def __init__(self):
        print("Creating new Multitest instance: {}".format(self))
        self.i = 0

    def run(self):
        with Pool(2) as pool:
            worker_jobs = []
            for j in range(4):
                time.sleep(1); self.i += 1
                job = pool.apply_async(self.process, (j,))
                worker_jobs.append(job)

            for job in worker_jobs:
                res = job.get()
                print("input", res)

    def process(self, inp):
        print("i", self.i)
        print("Copied instance: {}".format(self))
        self.i += 1

        return inp

if __name__ == '__main__':
    mt = Multitest()
    mt.run()

结果:

Creating new Multitest instance: <__main__.Multitest object at 0x1056fc8b0>
i 1
Copied instance: <__mp_main__.Multitest object at 0x101052d90>
i 2
Copied instance: <__mp_main__.Multitest object at 0x101052df0>
i 3
Copied instance: <__mp_main__.Multitest object at 0x101052d90>
input 0
input 1
input 2
i 4
Copied instance: <__mp_main__.Multitest object at 0x101052df0>
input 3

关于您的第二个查询,我认为如果您希望在流程中维护状态,您可能只需要提交一项工作。 Pool(2) 不是处理 10 个独立的作业,而是让 Pool(2) 处理 2 个独立的作业,每个作业由 5 个相互依赖的子作业组成。或者,如果你真的想要 10 个作业,你可以使用由 pid 索引的共享数据结构,这样在单个进程中(按顺序)运行的所有作业都可以操作 i 的单个副本。

这是一个共享数据结构的示例,采用模块中的全局形式:

from multiprocessing.pool import Pool
from collections import defaultdict
import os
import myglobals # (empty .py file)

myglobals.i = defaultdict(lambda:0)

class Multitest:
    def __init__(self):
        pid = os.getpid()
        print("Creating new Multitest instance: {}".format(self))
        print("i {} (pid: {})".format(myglobals.i[pid], pid))

    def run(self):
        with Pool(2) as pool:
            worker_jobs = []
            for j in range(4):
                job = pool.apply_async(self.process, (j,))
                worker_jobs.append(job)

            for job in worker_jobs:
                res = job.get()
                print("input", res)

    def process(self, inp):
        pid = os.getpid()
        print("Copied instance: {}".format(self))
        print("i {} (pid: {})".format(myglobals.i[pid], pid))
        myglobals.i[pid] += 1

        return inp

if __name__ == '__main__':
    mt = Multitest()
    mt.run()

结果:

Creating new Multitest instance: <__main__.Multitest object at 0x1083f3880>
i 0 (pid: 3460)
Copied instance: <__mp_main__.Multitest object at 0x10d89cdf0>
i 0 (pid: 3463)
Copied instance: <__mp_main__.Multitest object at 0x10d89ce50>
Copied instance: <__mp_main__.Multitest object at 0x10550adf0>
i 0 (pid: 3462)
Copied instance: <__mp_main__.Multitest object at 0x10550ae50>
i 1 (pid: 3462)
i 1 (pid: 3463)
input 0
input 1
input 2
input 3

此技术来自https://stackoverflow.com/a/1676328/361691

【讨论】:

    【解决方案2】:

    我相信正在发生以下情况:

    1. 每次调用self.process,该方法都会被序列化(pickled)并发送到子进程。每次都会创建一个新副本。
    2. 该方法在子进程中运行,但由于它是单独副本的一部分,与父进程中的原始不同,因此其更改的状态不会也不会影响父进程。唯一传回的信息是返回值(也是腌制的)。

    请注意,子进程没有自己的Multitest 实例,因为它仅在__name__ == '__main__' 不适用于池创建的fork 时创建。

    如果要在子进程中维护状态,可以使用全局变量来实现。您可以在创建池时传递初始化器参数来初始化此类变量。

    以下显示了您想要的工作版本(但没有 OOP,它不适用于多处理):

    from multiprocessing.pool import Pool
    
    
    def initialize():
        global I
        I = 0
    
    
    def process(inp):
        global I
        print("I", I)
        I += 1
        return inp
    
    
    if __name__ == '__main__':
        with Pool(2, initializer=initialize) as pool:
            worker_jobs = []
            for j in range(10):
                job = pool.apply_async(process, (j,))
                worker_jobs.append(job)
    
            for job in worker_jobs:
                res = job.get()
                print("input", res)
    

    【讨论】:

      【解决方案3】:

      多处理和线程之间的一个区别是,在创建进程后,它使用的内存实际上是从它的父进程克隆出来的,因此进程之间没有共享内存。

      这是一个例子:

      import os
      import time
      from threading import Thread
      
      global_counter = 0
      
      def my_thread():
          global global_counter
          print("in thread, global_counter is %r, add one." % global_counter)
          global_counter += 1
      
      def test_thread():
          global global_counter
          th = Thread(target=my_thread)
          th.start()
          th.join()
          print("in parent, child thread joined, global_counter is %r now." % global_counter)
      
      def test_fork():
          global global_counter
          pid = os.fork()
          if pid == 0:
              print("in child process, global_counter is %r, add one." % global_counter)
              global_counter += 1
              exit()
          time.sleep(1)
          print("in parent, child process died, global_counter is still %r." % global_counter)
      
      def main():
          test_thread()
          test_fork()
      
      if __name__ == "__main__":
          main()
      

      输出:

      in thread, global_counter is 0, add one.
      in parent, child thread joined, global_counter is 1 now.
      in child process, global_counter is 1, add one.
      in parent, child process died, global_counter is still 1.
      

      在你的情况下:

      for j in range(10):
          # Before fork, self.i is 0, fork() dups memory, so the variable is not shared to the child.
          job = pool.apply_async(self.process, (j,))
          # After job finishes, child's self.i is 1 (not parent's), this variable is freed after child dies.
          worker_jobs.append(job)
      

      编辑:

      在 python3 酸洗中,绑定的方法也将包括对象本身,本质上是复制它。因此,每次调用 apply_async 时,对象 self 也会被腌制。

      import os
      from multiprocessing.pool import Pool
      import pickle
      
      class Multitest:
          def __init__(self):
              self.i = "myattr"
      
          def run(self):
              with Pool(2) as pool:
                  worker_jobs = []
                  for j in range(10):
                      job = pool.apply_async(self.process, (j,))
                      worker_jobs.append(job)
      
                  for job in worker_jobs:
                      res = job.get()
                      print("input", res)
      
          def process(self, inp):
              print("i", self.i)
              self.i += "|append"
      
              return inp
      
      def test_pickle():
          m = Multitest()
          print("original instance is %r" % m)
      
          pickled_method = pickle.dumps(m.process)
          assert b"myattr" in pickled_method
      
          unpickled_method = pickle.loads(pickled_method)
          # get instance from it's method (python 3)
          print("pickle duplicates the instance, new instance is %r" % unpickled_method.__self__)
      
      if __name__ == '__main__':
          test_pickle()
      

      输出:

      original instance is <__main__.Multitest object at 0x1072828d0>
      pickle duplicates the instance, new instance is <__main__.Multitest object at 0x107283110>
      

      【讨论】:

      • 对不起,这不能回答问题。问题不是关于进程之间的共享状态,而是同一进程中的假定相同实例,其属性未更新。请参阅 OP 的评论和其他答案。
      • 每个进程加一(0 -> 1):
      • 请重新阅读我的 OP 并尝试使用代码 sn-p 你会明白我的意思。
      • 如果你的意思是为什么输出i0s而不是1s,打印语句是在赋值之前,只是为了说清楚。我的理解是您期望输出 is 的范围为 0 到 9,对吗?
      • 正确。或者,好吧,至少应该有变量的连续递增(可能是一个进程比另一个进程快一点,因此不能保证两者都在 0-9 范围内)。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多