您需要multiprocessing.Pipe 或multiprocessing.Queue 将结果发送回您的父进程。如果你只是做 I/0,你应该使用Thread 而不是Process,因为它更轻量级并且大部分时间都花在等待上。我正在向您展示它是如何为进程和线程完成的。
使用队列处理
多处理队列建立在管道之上,访问与锁/信号量同步。队列是线程和进程安全的,这意味着您可以将一个队列用于多个生产者/消费者进程,甚至这些进程中的多个线程。在队列中添加第一项也将在调用过程中启动一个馈线线程。 multiprocessing.Queue 的额外开销使得在单生产者/单消费者场景中使用管道更可取且性能更高。
以下是使用multiprocessing.Queue 发送和检索结果的方法:
from multiprocessing import Process, Queue
SENTINEL = 'SENTINEL'
def sim_busy(out_queue, x):
for _ in range(int(x)):
assert 1 == 1
result = x
out_queue.put(result)
# If all results are enqueued, send a sentinel-value to let the parent know
# no more results will come.
out_queue.put(SENTINEL)
if __name__ == '__main__':
out_queue = Queue()
p = Process(target=sim_busy, args=(out_queue, 150e6)) # 150e6 == 150000000.0
p.start()
for result in iter(out_queue.get, SENTINEL): # sentinel breaks the loop
print(result)
队列作为参数传递给函数,结果是队列中的.put() 和队列中的父get.()s。 .get() 是一个阻塞调用,直到要获取 的东西(指定超时参数是可能的)才会恢复执行。请注意 sim_busy 在这里所做的工作是 cpu 密集型的,此时您会选择进程而不是线程。
工艺和管道
对于一对一的连接,管道就足够了。设置几乎相同,只是方法名称不同,对Pipe() 的调用会返回两个连接对象。在双工模式下,两个对象都是读写端,duplex=False(单工)第一个连接对象是管道的读端,第二个是写端。在这个基本场景中,我们只需要一个单工管道:
from multiprocessing import Process, Pipe
SENTINEL = 'SENTINEL'
def sim_busy(write_conn, x):
for _ in range(int(x)):
assert 1 == 1
result = x
write_conn.send(result)
# If all results are send, send a sentinel-value to let the parent know
# no more results will come.
write_conn.send(SENTINEL)
if __name__ == '__main__':
# duplex=False because we just need one-way communication in this case.
read_conn, write_conn = Pipe(duplex=False)
p = Process(target=sim_busy, args=(write_conn, 150e6)) # 150e6 == 150000000.0
p.start()
for result in iter(read_conn.recv, SENTINEL): # sentinel breaks the loop
print(result)
线程和队列
要与线程一起使用,您想切换到queue.Queue。 queue.Queue 构建在 collections.deque 之上,添加了一些锁以使其成为线程安全的。与多处理的队列和管道不同,放在queue.Queue 上的对象不会被腌制。由于线程共享相同的内存地址空间,内存复制的序列化是不必要的,只传输指针。
from threading import Thread
from queue import Queue
import time
SENTINEL = 'SENTINEL'
def sim_io(out_queue, query):
time.sleep(1)
result = query + '_result'
out_queue.put(result)
# If all results are enqueued, send a sentinel-value to let the parent know
# no more results will come.
out_queue.put(SENTINEL)
if __name__ == '__main__':
out_queue = Queue()
p = Thread(target=sim_io, args=(out_queue, 'my_query'))
p.start()
for result in iter(out_queue.get, SENTINEL): # sentinel-value breaks the loop
print(result)
- 阅读here为什么
for result in iter(out_queue.get, SENTINEL):
在可能的情况下,应该优先于 while True...break 设置。
- 阅读 here 为什么应该在所有脚本中使用
if __name__ == '__main__':,尤其是在多处理中。
- 更多关于
get()-usage here。