这可能是我的固执表现,但我总是发现像 celery 这样的项目在多处理(这已经很复杂)之上增加了一堆复杂性,这比它们的价值更麻烦。从速度和简单性的角度来看,也没有比使用 stdlib 共享内存和互斥锁更好的替代 imo。
对于您的情况,一个简单的解决方案是为每个进程使用一个先进先出队列,并将帧放入来自生产者的每个进程中。如果您为 n 个消费者制作每个帧的 n 个副本,这自然会产生大量内存使用,但是您可能很容易想出一种机制将帧本身放入 multiprocessing.sharedctypes.Array 并仅通过索引传递而是排队。只要限制队列的长度小于缓冲区的长度,就应该限制覆盖缓冲区中的帧,直到它被所有消费者使用。如果没有任何同步,这将是您的裤子座位,但一点点互斥魔法绝对可以使它成为一个非常强大的解决方案。
例如:
import numpy as np
from time import sleep
from multiprocessing import Process, freeze_support, Queue
from multiprocessing.sharedctypes import Array
from ctypes import c_uint8
from functools import reduce
BUFSHAPE = (10,10,10) #10 10x10 images in buffer
class Worker(Process):
def __init__(self, q_size, buffer, name=''):
super().__init__()
self.queue = Queue(q_size)
self.buffer = buffer
self.name = name
def run(self,): #do work here
#I hardcoded datatype here. you might need to communicate it to the child process
buf_arr = np.frombuffer(self.buffer.get_obj(), dtype=c_uint8)
buf_arr.shape = BUFSHAPE
while True:
item = self.queue.get()
if item == 'done':
print('child process: {} completed all frames'.format(self.name))
return
with self.buffer.get_lock(): #prevent writing while we're reading
#slice the frame from the array uning the index that was sent
frame = buf_arr[item%BUFSHAPE[0]] #depending on your use, you may want to make a copy here
#do some intense processing on `frame`
sleep(np.random.rand())
print('child process: {} completed frame: {}'.format(self.name, item))
def main():
#creating shared array
buffer = Array(c_uint8, reduce(lambda a,b: a*b, BUFSHAPE))
#make a numpy.array using that memory location to make it easy to stuff data into it
buf_arr = np.frombuffer(buffer.get_obj(), dtype=c_uint8)
buf_arr.shape = BUFSHAPE
#create a list of workers
workers = [Worker(BUFSHAPE[0]-2, #smaller queue than buffer to prevent overwriting frames not yet consumed
buffer, #pass in shared buffer array
str(i)) #numbered child processes
for i in range(5)] #5 workers
for worker in workers: #start the workers
worker.start()
for i in range(100): #generate 100 random frames to send to workers
#insert a frame into the buffer
with buffer.get_lock(): #prevent reading while we're writing
buf_arr[i%BUFSHAPE[0]] = np.random.randint(0,255, size=(10,10), dtype=c_uint8)
#send the frame number to each worker for processing. If the input queue is full, this will block until there's space
# this is what prevents `buf_arr[i%BUFSHAPE[0]] = np...` from overwriting a frame that hasn't been processed yet
for worker in workers:
worker.queue.put(i)
#when we're done send the 'done' signal so the child processes exit gracefully (or you could make them daemons)
for worker in workers:
worker.queue.put('done')
worker.join()
if __name__ == "__main__":
freeze_support()
main()
编辑
某种非一错误要求队列比缓冲区小 2 帧,而不是 1 帧,以防止在其时间之前覆盖帧。
EDIT2 - 第一次编辑的解释:
len(q) = len(buf)-2 的原因似乎是在我们从缓冲区获取帧之前调用了q.get(),并且在我们尝试将索引推送到队列之前写入了帧本身。如果长度差只有 1,worker 可能会从队列中拉出一个帧索引,然后生产者可能会看到它现在可以推送到队列并在 worker 有机会读取帧之前继续移动到下一帧本身。您可以通过多种不同的方式来处理这个问题,这可能会减少一直等待对方的进程,也许使用mp.Event。