引用:https://zhuanlan.zhihu.com/p/32513483

 

共享 numpy 数组

需要用到 numpy 时往往是数据量较大的场景,如果直接复制会造成大量内存浪费。共享 numpy 数组则是通过上面一节的 Array 实现,再用 numpy.frombuffer 以及 reshape 对共享的内存封装成 numpy 数组,代码如下:

# encoding:utf8
import ctypes
import os
import multiprocessing

import numpy as np

NUM_PROCESS = multiprocessing.cpu_count()


def worker(index):
    main_nparray = np.frombuffer(shared_array_base, dtype=ctypes.c_double)
    main_nparray = main_nparray.reshape(NUM_PROCESS, 10)
    pid = os.getpid()
    main_nparray[index, :] = pid
    return pid


if __name__ == "__main__":
    shared_array_base = multiprocessing.Array(
        ctypes.c_double, NUM_PROCESS * 10, lock=False)
    pool = multiprocessing.Pool(processes=NUM_PROCESS)
    result = pool.map(worker, range(NUM_PROCESS))
    main_nparray = np.frombuffer(shared_array_base, dtype=ctypes.c_double)
    main_nparray = main_nparray.reshape(NUM_PROCESS, 10)
    print( main_nparray )

 

运行结果:

Python多进程共享numpy 数组

 

 

 

 

===============================================================

 

 

 

 

多进程共享较大数据,如numpy数组的情况下我们需要使用multiprocessing下面的Value , Array从而实现多进程的共享,但是还有一个重要的问题就是数据的读写方式,由于CPython是在语言的数据结构上进行再次包装的,所以对于数据的读写是需要进行翻译的,也就是说对数据读写是需要对Python数据类型下对应的C类型的数据结构进行读写的,也正是因为这种数据读写方式所以对Python数据进行操作要比对C类型数据进行读写操作要慢上很多。

numpy数据的底层同样也是C类型的数据结构,同时numpy下面的数据操作很多都是可以直接对numpy类型下的底层数据结构来操作的,这样也就会省掉数据结构转换的时间花销,只要不把numpy数据转为Python类型数据,都是可以在numpy下对底层数据进行直接操作的。

虽然mulprocessing模块提供了共享数据类型,但是不同进程对共享数据的读写本身也会存在数据类型的转换。

用更直接的话来说,虽然mutprocessing提供了共享数据类型Value和Array,但是不同进程其实也是无法直接对其进行操作的,子进程如果要读取或写入共享数据Value和Array就需要将共享数据转为可以进行操作的Python数据类型或numpy数据类型,否则就难以直接对共享数据进行直接操作,这时候numpy.frombuffer函数就派上用场了,numpy.frombuffer函数可以直接读取Python数据类型、numpy数据类型和共享数据类型的底层数据类型,即C数据类型,这样的话使用numpy.frombuffer函数就会省去数据类型转换这一环节。numpy.frombuffer可以直接读取共享数据类型Value和Array,因为Value和Array的底层实现就是C数据类型。

 

下面给出几种多进程共享数据的读写方式代码,以验证最佳的多进程大数据量数据的共享方式。

运行环境介绍:

软件:Ubuntu18.04系统、python3.7.5

硬件:intel i7-8700 cpu,6物理核心12逻辑核心

 

 

1.  使用multiprocessing.Value / multiprocessing.Array + numpy.frombuffer方式:

(使用numpy.frombuffer 对数据的读写不需要类型的转换可以直接对数据进行读写操作)

 

import ctypes
import time
import multiprocessing
import numpy as np

NUM_PROCESS = multiprocessing.cpu_count()

size = 1000000


def worker(index):
    main_nparray = np.frombuffer(shared_array_base[index], dtype=ctypes.c_double)
    for i in range(10000):
        main_nparray[:] = index + i
    return index


if __name__ == "__main__":
    shared_array_base = []
    for _ in range(NUM_PROCESS):
        shared_array_base.append(multiprocessing.Array("d", size, lock=False))

    pool = multiprocessing.Pool(processes=NUM_PROCESS)

    a = time.time()
    result = pool.map(worker, range(NUM_PROCESS))
    b = time.time()
    print(b-a)
    #print(result)


    for i in range(NUM_PROCESS):
        main_nparray = np.frombuffer(shared_array_base[i], dtype=ctypes.c_double)
        print(main_nparray)
        print(type(main_nparray))
        print(main_nparray.shape)

    # 73.216189146
    # 73.2605750561
    # 73.3307318687
    # 73.4090409279
    # 73.4219110012
View Code

相关文章: