【问题标题】:Sharing array of objects with Python multiprocessing使用 Python 多处理共享对象数组
【发布时间】:2020-11-13 23:30:24
【问题描述】:

对于这个问题,我参考example in Python docs 讨论“使用SharedMemory 类和NumPy 数组,从两个不同的Python shell 访问相同的numpy.ndarray”。

我想实现的一个主要变化是操作类对象数组而不是整数值,如下所示。

import numpy as np
from multiprocessing import shared_memory    

# a simplistic class example
class A(): 
    def __init__(self, x): 
        self.x = x

# numpy array of class objects 
a = np.array([A(1), A(2), A(3)])       

# create a shared memory instance
shm = shared_memory.SharedMemory(create=True, size=a.nbytes, name='psm_test0')

# numpy array backed by shared memory
b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)                                    

# copy the original data into shared memory
b[:] = a[:]                                  

print(b)                                            

# array([<__main__.Foo object at 0x7fac56cd1190>,
#       <__main__.Foo object at 0x7fac56cd1970>,
#       <__main__.Foo object at 0x7fac56cd19a0>], dtype=object)

现在,在不同的 shell 中,我们附加到共享内存空间并尝试操作数组的内容。

import numpy as np
from multiprocessing import shared_memory

# attach to the existing shared space
existing_shm = shared_memory.SharedMemory(name='psm_test0')

c = np.ndarray((3,), dtype=object, buffer=existing_shm.buf)

即使在我们能够操作c 之前,打印它也会导致分段错误。事实上,我不能指望观察到尚未写入模块的行为,所以我的问题是可以如何处理共享的对象数组?

我目前正在挑选列表,但受保护的读/写会增加相当多的开销。我也尝试过使用Namespace,这很慢,因为不允许索引写入。另一个想法可能是在 ShareableList 中使用共享 Ctypes 结构,但我不知道从哪里开始。

此外还有一个设计方面:似乎shared_memory 中有一个open bug 可能会影响我的实现,其中我有多个进程处理数组的不同元素。

是否有一种更具可扩展性的方式在多个进程之间共享大量对象列表,以便在任何给定时间所有正在运行的进程都与列表中的唯一对象/元素进行交互?

更新:在这一点上,我也将接受部分答案,讨论这是否可以用 Python 实现。

【问题讨论】:

  • 当然,必须在正确的时间closeunlink 共享对象,但因为不完全相关而被忽略了。
  • 我想不出一个主意,所以我只想问:您的流程应该做什么类型的工作?我认为这不仅仅是一个简单的地图操作......
  • @AlexNe 简而言之:是的,这比地图更复杂;这个例子只是概念性的。如果您对更多细节感兴趣:每个进程都可以被认为是一个单独的“可编程单元”,所有这些都执行异步、完全分布式的算法来完成协调、内聚等。
  • 您是否出于任何特定原因使用 numpy 数组而不是列表? Numpy 数组可以与共享内存恶作剧一起使用,因为它们的实现方式可以直接与内存缓冲区一起使用。只要您有对象而不是其中的“数字”,此特权就会自动丢失。当多个线程攻击相同的对象时,CPython 尤其是 GIL 往往会出现异常行为。如果您想共享数字数组结构,那么我使用 numpy 原语。如果您想共享对象,我会告诉您考虑这些对象的具体细节/要求。

标签: python multiprocessing shared-memory python-3.8


【解决方案1】:

所以,我做了一些研究 (Shared Memory Objects in Multiprocessing) 并提出了一些想法:

传递 numpy 字节数组

序列化对象,然后将它们作为字节字符串保存到 numpy 数组中。这里的问题是

  1. 需要将数据类型从'psm_test0' 的创建者传递给'psm_test0' 的任何消费者。不过,这可以通过另一个共享内存来完成。

  2. pickleunpickle 本质上类似于deepcopy,即它实际上复制了基础数据。

“主”进程的代码如下:

import pickle
from multiprocessing import shared_memory
import numpy as np


# a simplistic class example
class A():
    def __init__(self, x):
        self.x = x

    def pickle(self):
        return pickle.dumps(self)

    @classmethod
    def unpickle(self, bts):
        return pickle.loads(bts)


if __name__ == '__main__':
    # Test pickling procedure
    a = A(1)
    print(A.unpickle(a.pickle()).x)
    # >>> 1

    # numpy array of byte strings
    a_arr = np.array([A(1).pickle(), A(2).pickle(), A('This is a really long test string which should exceed 42 bytes').pickle()])

    # create a shared memory instance
    shm = shared_memory.SharedMemory(
        create=True,
        size=a_arr.nbytes,
        name='psm_test0'
    )

    # numpy array backed by shared memory
    b_arr = np.ndarray(a_arr.shape, dtype=a_arr.dtype, buffer=shm.buf)

    # copy the original data into shared memory
    b_arr[:] = a_arr[:]

    print(b_arr.dtype)
    # 'S105'

为了消费者

import numpy as np
from multiprocessing import shared_memory
from test import A

# attach to the existing shared space
existing_shm = shared_memory.SharedMemory(name='psm_test0')

c = np.ndarray((3,), dtype='S105', buffer=existing_shm.buf)

# Test data transfer
arr = [a.x for a in list(map(A.unpickle, c))]
print(arr)
# [1, 2, ...]

我想说你有几种前进的方式:

  1. 使用简单的数据类型。

  2. 使用 C api 实现一些东西,但我不能真正帮助你。

  3. 使用Rust

  4. 使用Mangers。您可能会失去一些性能(不过我希望看到一个真正的基准测试),但是您可以获得一个相对安全且简单的共享对象接口。

  5. 使用Redis,它也有 Python 绑定...

【讨论】:

  • 虽然这不能解决我的确切问题,但这回答了问题中提出的问题。答案还链接了足够的资源,以便人们可以自己制定解决方案,因此我将其标记为已接受。谢谢!
猜你喜欢
  • 1970-01-01
  • 2023-04-03
  • 2023-03-10
  • 1970-01-01
  • 2017-02-07
  • 1970-01-01
  • 2021-03-19
  • 2019-10-09
  • 2022-10-15
相关资源
最近更新 更多