【问题标题】:Multiprocessing - Pipe vs Queue多处理 - 管道与队列
【发布时间】:2012-01-17 18:42:29
【问题描述】:

Python's multiprocessing package中的队列和管道的根本区别是什么?

在什么情况下应该选择一个而不是另一个?什么时候使用Pipe() 比较有利?什么时候使用Queue()比较有利?

【问题讨论】:

    标签: python performance queue multiprocessing pipe


    【解决方案1】:
    • Pipe() 只能有两个端点。

    • Queue() 可以有多个生产者和消费者。

    何时使用它们

    如果您需要两个以上的点进行通信,请使用Queue()

    如果您需要绝对的性能,Pipe() 会更快,因为Queue() 是建立在Pipe() 之上的。

    绩效基准测试

    假设您想要生成两个进程并尽快在它们之间发送消息。这些是使用 Pipe()Queue() 的类似测试之间的拉力赛的计时结果...这是在运行 Ubuntu 11.10 和 Python 2.7.2 的 ThinkpadT61 上。

    仅供参考,我提供了JoinableQueue() 的结果作为奖励; JoinableQueue() 在调用queue.task_done() 时计算任务(它甚至不知道具体的任务,它只计算队列中未完成的任务),以便queue.join() 知道工作已经完成。

    此答案底部的每个代码...

    mpenning@mpenning-T61:~$ python multi_pipe.py 
    Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
    Sending 100000 numbers to Pipe() took 0.328398942947 seconds
    Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
    mpenning@mpenning-T61:~$ python multi_queue.py 
    Sending 10000 numbers to Queue() took 0.105256080627 seconds
    Sending 100000 numbers to Queue() took 0.980564117432 seconds
    Sending 1000000 numbers to Queue() took 10.1611330509 seconds
    mpnening@mpenning-T61:~$ python multi_joinablequeue.py 
    Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
    Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
    Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
    mpenning@mpenning-T61:~$
    

    总之Pipe()Queue() 快大约三倍。甚至不要考虑JoinableQueue(),除非你真的必须拥有这些好处。

    奖励材料 2

    多处理会在信息流中引入细微的变化,除非您知道一些捷径,否则会使调试变得困难。例如,您可能有一个脚本在许多条件下通过字典进行索引时工作正常,但很少会因某些输入而失败。

    通常当整个python进程崩溃时我们会得到失败的线索;但是,如果多处理功能崩溃,您不会将未经请求的崩溃回溯打印到控制台。如果不知道是什么导致了进程崩溃,就很难追踪未知的多进程崩溃。

    我发现追踪多处理崩溃信息的最简单方法是将整个多处理函数包装在 try / except 中并使用 traceback.print_exc()

    import traceback
    def run(self, args):
        try:
            # Insert stuff to be multiprocessed here
            return args[0]['that']
        except:
            print "FATAL: reader({0}) exited while multiprocessing".format(args) 
            traceback.print_exc()
    

    现在,当您发现崩溃时,您会看到如下内容:

    FATAL: reader([{'crash': 'this'}]) exited while multiprocessing
    Traceback (most recent call last):
      File "foo.py", line 19, in __init__
        self.run(args)
      File "foo.py", line 46, in run
        KeyError: 'that'
    

    源代码:


    """
    multi_pipe.py
    """
    from multiprocessing import Process, Pipe
    import time
    
    def reader_proc(pipe):
        ## Read from the pipe; this will be spawned as a separate Process
        p_output, p_input = pipe
        p_input.close()    # We are only reading
        while True:
            msg = p_output.recv()    # Read from the output pipe and do nothing
            if msg=='DONE':
                break
    
    def writer(count, p_input):
        for ii in xrange(0, count):
            p_input.send(ii)             # Write 'count' numbers into the input pipe
        p_input.send('DONE')
    
    if __name__=='__main__':
        for count in [10**4, 10**5, 10**6]:
            # Pipes are unidirectional with two endpoints:  p_input ------> p_output
            p_output, p_input = Pipe()  # writer() writes to p_input from _this_ process
            reader_p = Process(target=reader_proc, args=((p_output, p_input),))
            reader_p.daemon = True
            reader_p.start()     # Launch the reader process
    
            p_output.close()       # We no longer need this part of the Pipe()
            _start = time.time()
            writer(count, p_input) # Send a lot of stuff to reader_proc()
            p_input.close()
            reader_p.join()
            print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
                (time.time() - _start)))
    

    """
    multi_queue.py
    """
    
    from multiprocessing import Process, Queue
    import time
    import sys
    
    def reader_proc(queue):
        ## Read from the queue; this will be spawned as a separate Process
        while True:
            msg = queue.get()         # Read from the queue and do nothing
            if (msg == 'DONE'):
                break
    
    def writer(count, queue):
        ## Write to the queue
        for ii in range(0, count):
            queue.put(ii)             # Write 'count' numbers into the queue
        queue.put('DONE')
    
    if __name__=='__main__':
        pqueue = Queue() # writer() writes to pqueue from _this_ process
        for count in [10**4, 10**5, 10**6]:             
            ### reader_proc() reads from pqueue as a separate process
            reader_p = Process(target=reader_proc, args=((pqueue),))
            reader_p.daemon = True
            reader_p.start()        # Launch reader_proc() as a separate python process
    
            _start = time.time()
            writer(count, pqueue)    # Send a lot of stuff to reader()
            reader_p.join()         # Wait for the reader to finish
            print("Sending {0} numbers to Queue() took {1} seconds".format(count, 
                (time.time() - _start)))
    

    """
    multi_joinablequeue.py
    """
    from multiprocessing import Process, JoinableQueue
    import time
    
    def reader_proc(queue):
        ## Read from the queue; this will be spawned as a separate Process
        while True:
            msg = queue.get()         # Read from the queue and do nothing
            queue.task_done()
    
    def writer(count, queue):
        for ii in xrange(0, count):
            queue.put(ii)             # Write 'count' numbers into the queue
    
    if __name__=='__main__':
        for count in [10**4, 10**5, 10**6]:
            jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process
            # reader_proc() reads from jqueue as a different process...
            reader_p = Process(target=reader_proc, args=((jqueue),))
            reader_p.daemon = True
            reader_p.start()     # Launch the reader process
            _start = time.time()
            writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)
            jqueue.join()         # Wait for the reader to finish
            print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count, 
                (time.time() - _start)))
    

    【讨论】:

    • @Jonathan “总之 Pipe() 比 Queue() 快三倍”
    • 太棒了!很好的答案,很高兴您提供了基准!我只有两个小问题:(1)“快几个数量级”有点夸大其词。差值是 x3,大约是一个数量级的三分之一。只是说。 ;-); (2) 一个更公平的比较是运行 N 个工作人员,每个工作人员通过点对点管道与主线程通信,而运行 N 个工作人员的性能都从单个点对多点队列中拉出。
    • 致您的“奖励材料”……是的。如果您要对 Process 进行子类化,请将大部分“运行”方法放在 try 块中。这也是记录异常的有用方法。复制正常的异常输出:sys.stderr.write(''.join(traceback.format_exception(*(sys.exc_info()))))
    • @alexpinho98 - 但是您将需要一些带外数据和相关的信令模式,以表明您发送的不是常规数据而是错误数据。鉴于发起进程已经处于不可预测的状态,这可能要求太多了。
    • @JJC 用你的狡辩来狡辩,3x 大约是半个数量级,而不是三分之一 -- sqrt(10) =~ 3。
    【解决方案2】:

    Queue() 的另一个值得注意的特性是馈线。 This 部分注释“当一个进程第一次将一个项目放入队列时,一个馈线线程将启动,它将对象从缓冲区传输到管道中。”可以将无限数量(或最大大小)的项目插入Queue(),而无需对queue.put() 进行任何调用阻塞。这允许您将多个项目存储在 Queue() 中,直到您的程序准备好处理它们。

    另一方面,Pipe() 对已发送到一个连接但尚未从另一个连接接收到的项目具有有限的存储量。在此存储空间用完后,对connection.send() 的调用将被阻塞,直到有空间写入整个项目。这将停止执行写入的线程,直到其他线程从管道中读取。 Connection 对象使您可以访问底层文件描述符。在 *nix 系统上,您可以使用 os.set_blocking() 函数防止 connection.send() 调用阻塞。但是,如果您尝试发送不适合管道文件的单个项目,这将导致问题。最新版本的 Linux 允许您增加文件的大小,但允许的最大大小因系统配置而异。因此,您永远不应该依赖Pipe() 来缓冲数据。对connection.send 的调用可能会阻塞,直到从其他管道中读取数据。

    总之,当你需要缓冲数据时,队列是比管道更好的选择。即使您只需要在两点之间进行通信。

    【讨论】:

    • 您链接的部分对馈线线程做了注释,但put 方法的the documentation 仍将其声明为阻塞或失败方法:“如果可选参数块为 True(默认) 并且 timeout 是 None (默认值),如果有必要阻塞,直到空闲槽可用。如果 timeout 是一个正数,它最多阻塞 timeout 秒并引发队列。如果在那段时间内没有空闲槽可用,则完全异常。 "你确定你的答案吗?
    • 我确信我的回答。如果Queue 的构造函数的maxsize 参数被指定,put 方法将被阻塞。但这将是因为队列中的项目数量,而不是单个项目的大小。
    • 感谢您的澄清,我错过了那部分。
    猜你喜欢
    • 2018-06-27
    • 2016-02-19
    • 1970-01-01
    • 1970-01-01
    • 2011-03-30
    • 2013-10-02
    • 2013-06-18
    • 2022-01-21
    • 2017-10-21
    相关资源
    最近更新 更多