【问题标题】:Python multiprocessing: Shared memory (numpy) Array not being modifed as expectedPython多处理:共享内存(numpy)数组未按预期修改
【发布时间】:2016-04-25 20:43:11
【问题描述】:

我用 Python 编写了一个小型多处理程序,它读取一个值数组并异步运行多个进程以对数据数组的某些部分进行操作。每个单独的进程应该有自己的二维数组的一维部分,进程之间没有重叠。一旦所有进程完成,共享内存数组将被写入文件,但在我的代码中,共享内存数组中不存在预期/计算值,但原始值仍然存在。似乎在进程中分配新值并没有坚持到共享内存对象。也许有一些我不理解的事情(例如通过引用传递与通过值传递)正在导致我的麻烦?

我有一个处理器类,它创建许多工作进程并实例化一个 JoinableQueue。每个进程调用的函数对二维共享内存数组的索引切片进行操作并就地更新这些数组值,因此输入(共享内存)数组应将所有值替换为计算结果,因此应该不需要为结果设置第二个数组。主函数传递共享内存数组和一个索引值作为计算函数的参数,这些被添加到一个队列中,进程对象将从该队列中消耗工作项。代码如下:

class Processor:

    queue = None

    def __init__(self, 
                 number_of_workers=1):

        # create a joinable queue
        self.queue = JoinableQueue()

        # create the processes
        self.processes = [Process(target=self.compute) for _ in range(number_of_workers)]
        for p in self.processes:
            p.start()

    def add_work_item(self, item):

        # add the parameters list to the parameters queue
        self.queue.put(item)

    def compute(self):

        while True:

            # get a list of arguments from the queue
            arguments = self.queue.get()

            # if we didn't get one we keep looping
            if arguments is None:
                break

            # process the arguments here
            data = arguments[0]
            index = arguments[1] 

            # only process non-empty grid cells, i.e. data array contains at least some non-NaN values
            if (isinstance(data[:, index], np.ma.MaskedArray) and data[:, index].mask.all()) or np.isnan(data[:, index]).all():

                pass         

            else:  # we have some valid values to work with

                logger.info('Processing latitude: {}'.format(index))

                # perform a fitting to gamma     
                results = do_something(data[:, index])

                # update the shared array
                data[:, index] = results

            # indicate that the task has completed
            self.queue.task_done()

    def terminate(self):

        # terminate all processes
        for p in self.processes:
            p.terminate()

    def wait_on_all(self):

        #wait until queue is empty
        self.queue.join()

#-----------------------------------------------------------------------------------------------------------------------
if __name__ == '__main__':

    try:

        # log some timing info, used later for elapsed time 
        start_datetime = datetime.now()
        logger.info("Start time:    {}".format(start_datetime, '%x'))

        # get the command line arguments
        input_file = sys.argv[1]
        input_var_name = sys.argv[2]
        output_file_base = sys.argv[3]
        month_scale = int(sys.argv[4])

        # create the variable name from the indicator, distribution, and month scale
        variable_name = 'spi_gamma_{}'.format(str(month_scale).zfill(2))

        # open the NetCDF files
        with netCDF4.Dataset(input_file) as input_dataset, \
            netCDF4.Dataset(output_file_base + '_' + variable_name + '.nc', 'w') as output_dataset:

            # read info from the input dataset and initialize the output for writing

            # create a processor with a number of worker processes
            number_of_workers = 1
            processor = Processor(number_of_workers)

            # for each longitude slice
            for lon_index in range(lon_size):

                logger.info('\n\nProcessing longitude: {}\n'.format(lon_index))

                # read the longitude slice into a data array     
                longitude_slice = input_dataset.variables[input_var_name][:, lon_index, :]

                # reshape into a 1-D array
                original_shape = longitude_slice.shape
                flat_longitude_slice = longitude_slice.flatten()

                # convert the array onto a shared memory array which can be accessed from within another process
                shared_array_base = Array(ctypes.c_double, flat_longitude_slice)
                shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
                shared_array = shared_array.reshape(original_shape)

                # loop over each latitude point in the longitude slice
                for lat_index in range(lat_size):

                    # have the processor process the shared array at this index
                    arguments = [shared_array, lat_index]
                    processor.add_work_item(arguments)

                # join to the processor and don't continue until all processes have completed
                processor.wait_on_all()


                # write the fitted longitude slice values into the output NetCDF
                output_dataset.variables[variable_name][:, lon_index, :] = np.reshape(shared_array, (time_size, 1, lat_size))

            # all processes have completed
            processor.terminate()

    except Exception, e:
        logger.error('Failed to complete', exc_info=True)
        raise

谁能看到我哪里出错了,即为什么共享内存数组的值没有像我预期的那样更新?

提前感谢任何 cmets 或建议。

更新:我现在可以为单个进程工作,但是当我尝试生成多个进程时,我收到了一个 pickle 错误:

pickle.PicklingError: Can't pickle '_subprocess_handle' object: <_subprocess_handle object at 0x00000000021CF9F0>

这发生在第二个进程启动时,在 Processor.init() 函数中。如果我使用单个进程 (number_of_workers = 1) 运行以下代码,那么我不会遇到此错误并且我的代码按预期运行,尽管没有使用多个处理器,这是一个目标。

class Processor:

    queue = None

    def __init__(self, 
                 shared_array,
                 data_shape,
                 number_of_workers=1):

        # create a joinable queue
        self.queue = JoinableQueue()

        # keep reference to shared memory array
        self.shared_array = shared_array
        self.data_shape = data_shape

        # create the processes
        self.processes = [Process(target=self.compute_indicator) for _ in range(number_of_workers)]
        for p in self.processes:
            p.start()

    def add_work_item(self, item):

        # add the parameters list to the parameters queue
        self.queue.put(item)

    def compute_indicator(self):

        while True:

            # get a list of arguments from the queue
            arguments = self.queue.get()

            # if we didn't get one we keep looping
            if arguments is None:
                break

            # process the arguments here
            index = arguments[0] 

            # turn the shared array into a numpy array     
            data = np.ctypeslib.as_array(self.shared_array)
            data = data.reshape(self.data_shape)

            # only process non-empty grid cells, i.e. data array contains at least some non-NaN values
            if (isinstance(data[:, index], np.ma.MaskedArray) and data[:, index].mask.all()) \
                or np.isnan(data[:, index]).all() or (data[:, index] < 0).all():

                pass         

            else:  # we have some valid values to work with

                logger.info('Processing latitude: {}'.format(index))

                # perform computation     
                fitted_values = do_something(data[:, index])

                # update the shared array
                data[:, index] = fitted_values

            # indicate that the task has completed
            self.queue.task_done()

    def terminate(self):

        # terminate all processes
        for p in self.processes:
            p.terminate()

    def wait_on_all(self):

        #wait until queue is empty
        self.queue.join()

#-----------------------------------------------------------------------------------------------------------------------
if __name__ == '__main__':

            # create a shared memory array which can be accessed from within another process
            shared_array_base = Array(ctypes.c_double, time_size * lat_size, lock=False)

            # create a processor with a number of worker processes
            number_of_workers = 4
            data_shape = (time_size, lat_size)
            processor = Processor(shared_array_base, data_shape, number_of_workers)

            # for each longitude slice
            for lon_index in range(lon_size):

                logger.info('\n\nProcessing longitude: {}\n'.format(lon_index))

                # get the shared memory array and convert into a numpy array with proper dimensions
                longitude_array = np.ctypeslib.as_array(shared_array_base)
                longitude_array = np.reshape(longitude_array, data_shape)

                # read the longitude slice into the shared memory array     
                longitude_array[:] = input_dataset.variables[input_var_name][:, lon_index, :]

                # a list of arguments we'll map to the processes of the pool
                arguments_iterable = []

                # loop over each latitude point in the longitude slice
                for lat_index in range(lat_size):

                    # have the processor process the shared array at this index
                    processor.add_work_item([lat_index])

                # join to the processor and don't continue until all processes have completed
                processor.wait_on_all()

                # get the longitude slice of fitted values from the shared memory array and convert  
                # into a numpy array with proper dimensions which we can then use to write to NetCDF
                fitted_array = np.ctypeslib.as_array(shared_array_base)
                fitted_array = np.reshape(fitted_array, (time_size, 1, lat_size))

                # write the longitude slice of computed values into the output NetCDF
                output_dataset.variables[variable_name][:, lon_index, :] = fitted_array

            # all processes have completed
            processor.terminate()

【问题讨论】:

  • 为什么不使用multiprocessing.Pool 中的map() 方法而不是自己编写?
  • 只是一个想法,但您正在创建Process-es 创建共享数组之前。您确定 Process-es 实际上可以访问共享数组吗?可能他们只是得到一份副本......
  • shared_array_base 应作为参数传递给目标 compute 方法。实际上,对于 POSIX 系统,它只需要通过 fork 继承,但对于 Windows 支持,它需要作为参数来允许 mmap 共享内存的名称和状态被腌制并通过管道传输到子进程。您可以在每个工作进程中将共享数组包装为 NumPy 数组;不要像你现在做的那样腌制和传送一份副本给每个工人。
  • 我想要一个循环,在该循环中,我只将经度数据片读取到共享内存数组中,并且所有进程都应该在作为参数提供给 compute() 函数的索引处对共享数组进行操作.一旦所有进程都完成了它们的 compute() 函数,那么共享内存数组会在下一个经度切片处重新读取,并且用于处理新切片的更多工作项(切片的每个纬度点一个)被添加到队列中,理解为进程将使用这些新的工作项并在切片的这些点上进行计算,依此类推。
  • 当我使用池时出现 pickle 错误:cPickle.PicklingError: Can't pickle : 属性查找 multiprocessing.sharedctypes.c_double_Array_71400 失败。当作为参数传递给函数时,似乎无法腌制共享内存数组。

标签: python arrays numpy multiprocessing ctypes


【解决方案1】:

我现在已经成功实现了一个解决方案,尽管它仍然表现出意外行为:1)它在 Windows 环境中的所有 CPU 上运行,但该进程的总运行时间并不比运行单个处理器作业快(即相同没有任何 multiprocessing.* 用法的代码),以及 2)当我在 Linux 环境(虚拟容器)上运行代码时,我只看到四分之一的 CPU 被使用。无论如何,我现在有一个使用共享内存数组的工作代码,这就是最初的问题。如果有人能看出我哪里出错了,这导致了上面提到的两个问题,那么请在 cmets 中跟进。

def compute(args):

    # extract the arguments
    lon_index = args[0]
    lat_index = args[1]

    # NOT SHOWN
    # get the data_shape value

    # turn the shared array into a numpy array
    data = np.ctypeslib.as_array(shared_array)
    data = data.reshape(data_shape)

    # perform the computation, update the indexed array slice
    data[:, lon_index, lat_index] = perform_computation(data[:, lon_index, lat_index])  

def init_process(array):

    # put the arguments to the global namespace  
    global shared_array
    shared_array = array


if __name__ == '__main__':

    # NOT SHOWN
    # get the lat_size, lon_size, time_size, lon_stride, and data_shape values

    # create a shared memory array which can be accessed from within another process
    shared_array = Array(ctypes.c_double, time_size * lon_stride * lat_size, lock=False)
    data_shape = (time_size, lon_stride, lat_size)

    # create a processor with a number of worker processes
    number_of_workers = cpu_count()

    # create a Pool, essentially forking with copies of the shared array going to each pooled/forked process
    pool = Pool(processes=number_of_workers, 
                initializer=init_process, 
                initargs=(shared_array))

    # for each slice
    for lon_index in range(0, lon_size, lon_stride):

        # convert the shared memory array into a numpy array with proper dimensions
        slice_array = np.ctypeslib.as_array(shared_array)
        slice_array = np.reshape(slice_array, data_shape)

        # read the longitude slice into the shared memory array     
        slice_array[:] = read_data(data_shape)

        # a list of arguments we'll map to the processes of the pool
        arguments_iterable = []

        # loop over each latitude point in the longitude slice
        for lat_index in range(lat_size):

            for i in range(lon_stride):

                # have the processor process the shared array at this index
                arguments = [i, lat_index]
                arguments_iterable.append(arguments)

                # map the arguments iterable to the compute function
                pool.map(compute, arguments_iterable)

                # get the longitude slice of fitted values from the shared memory array and convert  
                # into a numpy array with proper dimensions which we can then use to write to NetCDF
                fitted_array = np.ctypeslib.as_array(shared_array)
                fitted_array = np.reshape(fitted_array, (time_size, lon_stride, lat_size))

                # NOT SHOWN
                # write the slice of computed values to file

            # all processes have completed, close the pool
            pool.close()

【讨论】:

    猜你喜欢
    • 2018-10-20
    • 2021-03-19
    • 1970-01-01
    • 1970-01-01
    • 2011-12-15
    • 2015-03-15
    • 1970-01-01
    • 2021-09-23
    相关资源
    最近更新 更多