【发布时间】:2012-01-17 18:42:29
【问题描述】:
Python's multiprocessing package中的队列和管道的根本区别是什么?
在什么情况下应该选择一个而不是另一个?什么时候使用Pipe() 比较有利?什么时候使用Queue()比较有利?
【问题讨论】:
标签: python performance queue multiprocessing pipe
Python's multiprocessing package中的队列和管道的根本区别是什么?
在什么情况下应该选择一个而不是另一个?什么时候使用Pipe() 比较有利?什么时候使用Queue()比较有利?
【问题讨论】:
标签: python performance queue multiprocessing pipe
何时使用它们
如果您需要两个以上的点进行通信,请使用Queue()。
如果您需要绝对的性能,Pipe() 会更快,因为Queue() 是建立在Pipe() 之上的。
绩效基准测试
假设您想要生成两个进程并尽快在它们之间发送消息。这些是使用 Pipe() 和 Queue() 的类似测试之间的拉力赛的计时结果...这是在运行 Ubuntu 11.10 和 Python 2.7.2 的 ThinkpadT61 上。
仅供参考,我提供了JoinableQueue() 的结果作为奖励; JoinableQueue() 在调用queue.task_done() 时计算任务(它甚至不知道具体的任务,它只计算队列中未完成的任务),以便queue.join() 知道工作已经完成。
此答案底部的每个代码...
mpenning@mpenning-T61:~$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
mpenning@mpenning-T61:~$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
mpnening@mpenning-T61:~$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
mpenning@mpenning-T61:~$
总之Pipe() 比Queue() 快大约三倍。甚至不要考虑JoinableQueue(),除非你真的必须拥有这些好处。
奖励材料 2
多处理会在信息流中引入细微的变化,除非您知道一些捷径,否则会使调试变得困难。例如,您可能有一个脚本在许多条件下通过字典进行索引时工作正常,但很少会因某些输入而失败。
通常当整个python进程崩溃时我们会得到失败的线索;但是,如果多处理功能崩溃,您不会将未经请求的崩溃回溯打印到控制台。如果不知道是什么导致了进程崩溃,就很难追踪未知的多进程崩溃。
我发现追踪多处理崩溃信息的最简单方法是将整个多处理函数包装在 try / except 中并使用 traceback.print_exc():
import traceback
def run(self, args):
try:
# Insert stuff to be multiprocessed here
return args[0]['that']
except:
print "FATAL: reader({0}) exited while multiprocessing".format(args)
traceback.print_exc()
现在,当您发现崩溃时,您会看到如下内容:
FATAL: reader([{'crash': 'this'}]) exited while multiprocessing
Traceback (most recent call last):
File "foo.py", line 19, in __init__
self.run(args)
File "foo.py", line 46, in run
KeyError: 'that'
源代码:
"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time
def reader_proc(pipe):
## Read from the pipe; this will be spawned as a separate Process
p_output, p_input = pipe
p_input.close() # We are only reading
while True:
msg = p_output.recv() # Read from the output pipe and do nothing
if msg=='DONE':
break
def writer(count, p_input):
for ii in xrange(0, count):
p_input.send(ii) # Write 'count' numbers into the input pipe
p_input.send('DONE')
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
# Pipes are unidirectional with two endpoints: p_input ------> p_output
p_output, p_input = Pipe() # writer() writes to p_input from _this_ process
reader_p = Process(target=reader_proc, args=((p_output, p_input),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
p_output.close() # We no longer need this part of the Pipe()
_start = time.time()
writer(count, p_input) # Send a lot of stuff to reader_proc()
p_input.close()
reader_p.join()
print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_queue.py
"""
from multiprocessing import Process, Queue
import time
import sys
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
## Write to the queue
for ii in range(0, count):
queue.put(ii) # Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
pqueue = Queue() # writer() writes to pqueue from _this_ process
for count in [10**4, 10**5, 10**6]:
### reader_proc() reads from pqueue as a separate process
reader_p = Process(target=reader_proc, args=((pqueue),))
reader_p.daemon = True
reader_p.start() # Launch reader_proc() as a separate python process
_start = time.time()
writer(count, pqueue) # Send a lot of stuff to reader()
reader_p.join() # Wait for the reader to finish
print("Sending {0} numbers to Queue() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
queue.task_done()
def writer(count, queue):
for ii in xrange(0, count):
queue.put(ii) # Write 'count' numbers into the queue
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process
# reader_proc() reads from jqueue as a different process...
reader_p = Process(target=reader_proc, args=((jqueue),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
_start = time.time()
writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)
jqueue.join() # Wait for the reader to finish
print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count,
(time.time() - _start)))
【讨论】:
Queue() 的另一个值得注意的特性是馈线。 This 部分注释“当一个进程第一次将一个项目放入队列时,一个馈线线程将启动,它将对象从缓冲区传输到管道中。”可以将无限数量(或最大大小)的项目插入Queue(),而无需对queue.put() 进行任何调用阻塞。这允许您将多个项目存储在 Queue() 中,直到您的程序准备好处理它们。
另一方面,Pipe() 对已发送到一个连接但尚未从另一个连接接收到的项目具有有限的存储量。在此存储空间用完后,对connection.send() 的调用将被阻塞,直到有空间写入整个项目。这将停止执行写入的线程,直到其他线程从管道中读取。 Connection 对象使您可以访问底层文件描述符。在 *nix 系统上,您可以使用 os.set_blocking() 函数防止 connection.send() 调用阻塞。但是,如果您尝试发送不适合管道文件的单个项目,这将导致问题。最新版本的 Linux 允许您增加文件的大小,但允许的最大大小因系统配置而异。因此,您永远不应该依赖Pipe() 来缓冲数据。对connection.send 的调用可能会阻塞,直到从其他管道中读取数据。
总之,当你需要缓冲数据时,队列是比管道更好的选择。即使您只需要在两点之间进行通信。
【讨论】:
put 方法的the documentation 仍将其声明为阻塞或失败方法:“如果可选参数块为 True(默认) 并且 timeout 是 None (默认值),如果有必要阻塞,直到空闲槽可用。如果 timeout 是一个正数,它最多阻塞 timeout 秒并引发队列。如果在那段时间内没有空闲槽可用,则完全异常。 "你确定你的答案吗?
Queue 的构造函数的maxsize 参数被指定,put 方法将被阻塞。但这将是因为队列中的项目数量,而不是单个项目的大小。