【问题标题】:Filling a class member data array using multiprocessing使用多处理填充类成员数据数组
【发布时间】:2022-01-06 01:46:30
【问题描述】:

我有一个类,里面有一个浮点数组(它可以是 np.array 甚至是 python 列表,这并不重要)。

from multiprocessing import Pool, RawArray
import numpy as np

class ClassTest:
    def __init__(self, a_N):
        self.N = a_N
        self.arr = np.zeros(a_N, float)

数组不一定很大,但可能很大。然而,为了填充self.arr[i] 的每一个元素,需要大量的工作,即打开文件、读取和解析数据、计算复杂的函数等,因此,我想使用 threading 或 multiprocessing 模块。前者受python全局解释器锁的影响,基本上所有线程一个接一个地运行,而不是并行运行,因此不提供任何计算加速。后者似乎适合这项工作,但是,self.arr 然后需要以某种方式在进程之间共享。我试过这段代码(都是类的方法)。

    def fillAll(self):
        Np = 4
        Nc = self.N//Np
        pool = Pool(processes=Np)
        for i in range(self.Np):
            i0 = i*Nc
            i1 = i0 + Nc
            pool.apply_async(self.fillChunk, (i0, i1, ))
        pool.close()
        pool.join()

    def fillChunk(self, a_start, a_finish):
        for i in range(a_start, a_finish):
            self.arr[i] = computeOneCell(i)
  

computeOneCell(i) 是一个外部函数,它完成所有工作并返回一个浮点数。就我从 Internet 上的帮助中获得的信息而言,这段代码不起作用,因为每个进程都有自己的类实例副本,并且分叉进程完成的所有操作都不会影响父进程。我还尝试使用 RawArray,根据documentation,它被用作共享内存。但是,当我将 self.arr = np.zeros(a_N, float) 替换为 self.arr_X = RawArray('d', a_N) 时,代码不起作用。它不会崩溃,但也不会进入fillChunk 函数,就好像从未调用过pool.apply_async(self.fillChunk, (i0, i1, ))

当不涉及类时,我有一个解决此问题的方法,即 RawArray 和 pool 在全局范围内声明,但我不知道如何使用类。

【问题讨论】:

    标签: python arrays class multiprocessing


    【解决方案1】:

    您真的不希望多个线程都对同一个数组进行更改。而且您不能真正让多个进程以简单的方式执行此操作。

    我会做类似的事情

    def fillAll(self):
        with Pool() as pool:
            for index, value in pool.imap(valueForCellAtIndex, range(self.N), chunksize=10):
                self.arr[index] = value
    
    def valueForCellAtIndex(self, index):
        return index, computeOneCell(index)
    

    您甚至可以使用imap_unordered 做得更好。通过同时返回来自fillChunk 的索引和值,您不需要按顺序返回结果。

    您可能想尝试一下chunksize,看看什么能给您带来最好的结果。

    【讨论】:

    • 感谢您的回复。乍一看,您提供的代码似乎有效。不过,我现在更疑惑了。例如,当您调用self.arr[index] = value 时,它是所有进程的共享数组还是数组的一组副本最终以某种方式合并为一个?
    • 其他进程永远看不到数组。唯一的数组在主线程中。它询问其他线程什么值进入每个索引,但只有主线程拥有实际的数组,以便它可以存储值。所有fillCell 都知道它应该提供一个给定索引的值——我可能不应该将函数命名为fillCell,而是valueForIndex,因为它实际上并没有填充单元格。 (我正在编辑代码,因为我认为这更清晰一些。)
    • 现在很有意义!但是我们需要一个由进程共享的数组,例如,如果fillCell 方法需要知道数组中相邻单元格的一些信息。基本上,我试图找出一个在共享内存上以类方法包装的并行程序的一般结构,类似于我们在 C/C++ 中使用 POSIX 线程可以做的事情。
    • 您可能想查看multiprocessing.Valuemultiprocessing.Array,它们允许您跨进程共享内存。请注意,跨线程和进程共享内存非常容易出错,并且很容易编写不正确的代码和竞争条件。这就是为什么我在上面故意避免它。
    猜你喜欢
    • 2017-09-11
    • 2017-06-30
    • 1970-01-01
    • 2023-03-25
    • 2022-01-22
    • 2020-12-27
    • 2016-10-01
    • 1970-01-01
    • 2016-08-02
    相关资源
    最近更新 更多