【问题标题】:python shared memory with poolpython与池共享内存
【发布时间】:2021-09-24 10:03:21
【问题描述】:

我正在尝试在 python 的多处理中使用 shared_memory 和池。

Documentation,关于shared memory,参数buf(内存视图)我不清楚(可能是因为我不理解内存视图的概念——它是一个指针吗?)。
我想在不同的进程中使用这个共享内存。以下,我的示例基于文档:

a = np.array([1, 1, 2, 3, 5, 8])  
shm = shared_memory.SharedMemory(create=True, size=a.nbytes)

# Do I need to create the existing_shm or I can keep using shm?
existing_shm = shared_memory.SharedMemory(name=shm.name)

现在是我的第一个问题。我定义了将使用共享内存中的数组的函数:

def test_function(Input):
    c = np.ndarray(a.shape, dtype=np.int64, buffer=existing_shm.buf)
    c[1]=100
    print(c)

这是不正确的,但我不知道应该如何。

然后是主要的。是否有主要功能来完成这项工作?

if __name__=='__main__':
    with Pool(os.cpu_count()) as p:
        p.map(test_function, range(12))

它不起作用。 我是否必须在每个流程中定义c?或者我可以在主要定义它并在所有进程中使用它?我假设c 是一个python 对象,因此由于gil-lock 而不能被进程共享?

非常感谢!

【问题讨论】:

    标签: python-3.x multiprocessing shared-memory


    【解决方案1】:

    这行得通。不过,我还没有清楚地了解所有事实。

    1- 共享内存对象被声明:
    shm = shared_memory.SharedMemory(create=True, size=10000000*4).

    2- 使用缓冲区声明一个(本例中为 numpy 数组)对象,如下所示:
    b = np.ndarray((10000000,), dtype=np.int32, buffer=shm.buf).

    3- numpy 数组通过将数据复制到其中来填充。
    b[:] = np.random.randint(100, size=10000000, dtype=np.int32)

    那么,在很多cpus中需要执行的函数就是共享内存对象的名称,而函数内部提到的第二步就是映射共享内存,之前已经填充了。

    您必须在访问共享对象后close 并在最后unlink

    import numpy as np
    from multiprocessing import shared_memory, Pool
    import os
    
    
    def test_function(args): 
        Input, shm_name, size = args
        existing_shm = shared_memory.SharedMemory(name=shm_name)
        d = np.ndarray(size, dtype=np.int32, buffer=existing_shm.buf)
        #print(Input, d[Input-1:Input+2])
        d[Input]=-20
        #print(Input, d[Input-1:Input+2])
        existing_shm.close()
        print(Input, 'parent process:', os.getppid())
        print(Input, 'process id:', os.getpid())
    
    
    if __name__=='__main__':
        
        shm = shared_memory.SharedMemory(create=True, size=10000000*4)
        b = np.ndarray((10000000,), dtype=np.int32, buffer=shm.buf)
        b[:] = np.random.randint(100, size=10000000, dtype=np.int32)
    
        inputs =[[    1,shm.name,b.shape],
        [    2,shm.name,b.shape],
        [    3,shm.name,b.shape],
        [    4,shm.name,b.shape],
        [    5,shm.name,b.shape],
        [    6,shm.name,b.shape],
        [    7,shm.name,b.shape],
        [    8,shm.name,b.shape],
        [    9,shm.name,b.shape],
        [    10,shm.name,b.shape],
        [    11,shm.name,b.shape],
        [    12,shm.name,b.shape],
        [13,shm.name,b.shape]]
    
        with Pool(os.cpu_count()) as p:
            p.map(test_function, inputs)
     
        print(b[:20])
        
        # Clean up from within the first Python shell
        shm.close()
        shm.unlink()  # Free and release the shared memory block at the very end
    

    【讨论】:

      猜你喜欢
      • 2014-10-08
      • 1970-01-01
      • 2021-01-17
      • 2021-12-04
      • 2016-06-22
      • 2021-05-26
      • 2016-12-13
      • 2018-02-02
      • 2012-12-11
      相关资源
      最近更新 更多