【发布时间】:2017-12-19 23:55:04
【问题描述】:
这与我仍在努力解决的earlier problem 有关。本质上,我需要 ProcessPoolExecutor 的逆向设计,其中我有许多查询进程和一个工人,它分批计算和发回结果。
使用一个共享队列很容易发送工作项,但我仍然没有一个很好的解决方案来将所有结果发送回正确进程上的正确线程。
【问题讨论】:
这与我仍在努力解决的earlier problem 有关。本质上,我需要 ProcessPoolExecutor 的逆向设计,其中我有许多查询进程和一个工人,它分批计算和发回结果。
使用一个共享队列很容易发送工作项,但我仍然没有一个很好的解决方案来将所有结果发送回正确进程上的正确线程。
【问题讨论】:
我认为为每个查询进程设置一个单独的multiprocessing.pipe 是最有意义的。工作进程等待任何管道上的可用项目,然后将其出列并处理它,跟踪它来自哪个管道。当需要发回数据时,它会将结果反馈到正确的管道中。
这是一个简单的例子:
#!/usr/bin/env python3
import multiprocessing as mp
def worker(pipes):
quit = [False] * len(pipes)
results = [''] * len(pipes)
# Wait for all workers to send None before quitting
while not all(quit):
ready = mp.connection.wait(pipes)
for pipe in ready:
# Get index of query proc's pipe
i = pipes.index(pipe)
# Receive and "process"
obj = pipe.recv()
if obj is None:
quit[i] = True
continue
result = str(obj)
results[i] += result
# Send back to query proc
pipes[i].send(result)
print(results)
def query(pipe):
for i in 'do some work':
pipe.send(i)
assert pipe.recv() == i
pipe.send(None) # Send sentinel
if __name__ == '__main__':
nquery_procs = 8
work_pipes, query_pipes = zip(*(mp.Pipe() for _ in range(nquery_procs)))
query_procs = [mp.Process(target=query, args=(pipe,)) for pipe in query_pipes]
for p in query_procs:
p.start()
worker(work_pipes)
for p in query_procs:
p.join()
或者,您可以为每个查询进程提供一个 ID 号(可能只是其管道的索引),并且任何请求都必须是一个元组,即 (id_num, data)。这只是绕过在每个循环上执行pipes.index(pipe) 的工作进程,所以我不确定它能给你带来多少。
【讨论】:
multiprocessing.Pipe 对象,那么,不,它是通过操作系统管道或套接字发送的,而不是在共享内存中。酸洗将对象打包/解包为要通过此管道发送的字节。