【问题标题】:Multiprocessing Pool not getting all cores多处理池未获得所有内核
【发布时间】:2020-07-02 10:27:16
【问题描述】:

我的代码需要一些并行化,为此我使用了 Python 的 multiprocessing 模块,特别是 Pool 类。发生并行化的代码的相关部分看起来像这样

import multiprocessing as mp
import numpy as np

@jit( nopython=True )
def numba_product( a, b ):
        
    a_len = len(a)
    b_len = len(b)
    n     = len( a[0,:] )    
    c_res   = np.empty( (  a_len*b_len, n ), dtype=np.complex128 ) 
    c_count = 0  
    for i in range(a_len):
        for j in range( b_len ):            
            c_res[ c_count , : ] = np.multiply( a[i,:], b[ j, : ]  )          
            c_count += 1
            
    return c_res

def do_some_computations( shared_object, index ):

    d  = shared_object.get_dictionary_1()
            
    some_numpy_array_1 = shared_object.get_numpy_array_1( index ) #this gets a numpy array from 
                                                                  # shared object attribute, i.e.,
                                                                  # from shared_object class 
                                                                  # definition, the method returns
                                                                  # "self.the_array" attribute that
                                                                  # belongs to shared object, see 
                                                                  # dummy version of class definition 
                                                                  # below            
    mask_array_1       = shared_object.get_mask_array_1() # this gets a mask for the specified array        
    filtered_array_1   = some_numpy_array_1[ mask_array_1] #note that this defines a local new array, 
                                                           # but shouldn't modify some_numpy_array_1 
                                                           # ( I believe ) 
    
    s_keys             = shared_object.get_keys_for_index( index ) #gets the keys corresponding to 
                                                                   #that index to create a new array        
    
    v   = np.array( [ d1[ x ] for x in  s_keys  ] )

    final_result = numba_product( filtered_array_1, v )  # 
   

def pool_worker_function( index, args ):    
    shared_object = args[0] 
    result = do_some_computations( shared_object, index ) 
    return result    
        
    
def parallel_exec( shared_object, N ):
    number_processors      = mp.cpu_count()
    number_used_processors = number_processors - 1

    #try pool map method with a READ-ONLY object that is "shared_object".
    # This object contains two big dictionaries from which values are retrieved, 
    # and various NumPy arrays of considerable dimension           
    from itertools import repeat      

    pool    = mp.Pool( processes = number_used_processors )       
     
    a = list( np.linspace( 0, N, N ) )          
    
    args = ( shared_object, )     
    number_tasks = number_used_processors  
      
    n_chunck = int( ( len(a) - 1 )/number_tasks )
             
    result = pool.starmap( pool_worker_function, zip( a, repeat( args ) ), chunksize = n_chunck)              
    pool.close()        
    pool.join()           
    return result

问题

我遇到的问题是,当我在 Unix 操作系统下运行它时,在 32 核系统上,我只观察到少数内核正在并行化......据我了解,Unix 提供自动os.fork()作为copy-on-write,这意味着如果我的shared_object在调用期间没有被修改,那么并行化应该在没有额外内存消耗的情况下发生,并且所有核心应该单独执行它们的任务?这是我在程序到达并行化部分时看到的快照:

这些让我感到困惑,我确保 cpu.count() 提供的内核总数为 32。我观察到的另一件事是,在整个并行化过程中,可用内存量从 ~84 GiB 持续减少可用于 ~59 GiB。这可能暗示每个进程都在创建“shared_object”类的副本,因此制作了该类包含的所有字典和 NumPy 数组的副本。我想绕过这个问题;我想使用所有内核进行并行化,但老实说,我不知道这里发生了什么。

该代码预计将在 32 核的 Unix 机器上运行,但我自己的笔记本电脑有 Windows 操作系统,这是我在启动它时在 Windows 上看到的快照(尽管就我所阅读的内容而言,Windows 确实如此不支持os.fork()方法,所以我猜内存消耗高也就不足为奇了?)。

如您所见,对操作系统(红色)的调用占用了非常高的 CPU 使用率。在上面显示的 Linux 案例快照中,情况似乎也是如此。

最后,我想强调一下“shared_object”类的形式如下:

class shared_object():

    def __init__(): pass
    
    def store_dictionaries_and_arrays( dict_1, dict_2, array_1, array_2, ...  ):
        
        self.dict_1 = dict_1
        self.dict_2 = dict_2
        self.array_1 = array_1
        # same for all other arguments passed
    def get_dictionary_1():
        return self.dict_1
    def get_numpy_array_1():
        return self.array_1

但是对于更多的属性,因此需要更多的“获取”方法。这是一个非常大的数据容器,因此我希望在执行并行化时没有它的副本,因为属性只能访问而不是修改,我在这里缺少什么?非常感谢任何帮助,这已经困扰我很长时间了......非常感谢!

【问题讨论】:

  • 我猜你的shared_object 被重复复制到每个子进程中。也许您可以在创建池时将其传递给initargs,或者使用Manager 在父级中仅拥有一个副本(这将序列化访问)。
  • 谢谢您的回答;是的,似乎是这样,但是为什么这只发生在某些 CPU 中而其他 CPU 保持空闲?如果在调用期间未修改 shared_object,则不应出现这种情况,并且所有 CPU 都应该运行
  • python 怎么知道shared_object 没有被修改?您没有在代码中的任何地方说过,并且确定这基本上需要解决halting problem。如果您知道它是只读的并且只关心在 Unix 中运行,我想您可以在创建 Pool 之前将其放入全局变量中,然后让子进程从那里获取它
  • 据我了解,在 os.fork() 多处理中,所有子进程都会“默认”继承父内存的副本。我的问题更多地涉及类中的某些操作如何导致修改;例如,字典赋值 d = class.get_dict() 使用该方法,我会假设这不会改变对象。但是,很难判断 Python 是否“可能”将此赋值解释为可能导致任何修改的东西。您建议在代码中初始化对象之前编写“global shared_object”对吗?

标签: python parallel-processing python-multiprocessing


【解决方案1】:

根据您的 cmets,我认为您只想做这样的事情:

def pool_worker_function(index, args):
    return do_some_computations(_shared_hack, index)

def parallel_exec(shared_object, N):
    global _shared_hack
    _shared_hack = shared_object

    # it'll use ncores processes by default
    with mp.Pool() as pool:
        return pool.map(pool_worker_function, range(N))

即将shared_object 保存在全局某处,并让子进程在需要时将其拾取。

你做了很多奇怪的设置,我已经去掉了这些设置,包括设置一个在任何地方都没有使用过的 chuncks 列表。我也改用range,因为您也在使用list(np.linspace(0, N, N)) 来设置一些似乎损坏的索引。例如,N=4 会给你[0, 1.333, 2.667, 4] 这看起来不像是我想要索引数组的东西

【讨论】:

  • 感谢详细的回复,对代码中的网格感到抱歉;我忘了删除一些不必要的部分。在发生多处理的模块中将 _shared_hack 声明为全局,这如何防止所有不同进程对对象的多次访问?如果我理解正确,现在应该只有一个属于“全局”的对象实例,我可以期望这里有共享内存吗?如果代码确实在某种意义上修改了类怎么办? (不应该)那这不是有点问题吗?
  • 也许读过fork的语义。每个孩子都有自己的(可变的)一切副本,当进行更改时,他们只是与父母不同。因此父进程看不到任何子进程中所做的任何更改,并且子进程看不到父进程在分叉后所做的任何更改
  • 这实际上解决了问题,现在所有CPU都进行计算,谢谢!
猜你喜欢
  • 2015-05-26
  • 1970-01-01
  • 2021-04-22
  • 2022-01-18
  • 1970-01-01
  • 2019-03-12
  • 1970-01-01
  • 2011-02-16
  • 2017-03-06
相关资源
最近更新 更多