【问题标题】:Python multiprocessing with Queue (split loads dynamically)带有队列的 Python 多处理(动态拆分加载)
【发布时间】:2016-02-17 08:25:14
【问题描述】:

我正在尝试使用多处理来处理大量文件。 我试图将文件列表放入队列中,并让 3 个工作人员使用常见的队列数据类型来分担负载。但是,这似乎不起作用。可能我对多处理包中的队列有误解。 下面是示例源代码:

import multiprocessing
from multiprocessing import Queue

def worker(i, qu):
    """worker function"""
    while ~qu.empty():
        val=qu.get()
        print 'Worker:',i, ' start with file:',val
        j=1
        for k in range(i*10000,(i+1)*10000): # some time consuming process
            for j in range(i*10000,(i+1)*10000):
                j=j+k
        print 'Worker:',i, ' end with file:',val


if __name__ == '__main__':
    jobs = []

    qu=Queue()
    for j in range(100,110): # files numbers are from 100 to 110
        qu.put(j)

    for i in range(3): # 3 multiprocess
        p = multiprocessing.Process(target=worker, args=(i,qu))
        jobs.append(p)
        p.start()
    p.join()

感谢 cmets。 我知道使用 Pool 是最好的解决方案。

import multiprocessing
import time

def worker(val):
    """worker function"""
    print 'Worker: start with file:',val
    time.sleep(1.1)
    print 'Worker: end with file:',val


if __name__ == '__main__':
    file_list=range(100,110)
    p = multiprocessing.Pool(2)
    p.map(worker, file_list)

【问题讨论】:

  • 您遇到了什么问题?你期望什么输出,你得到什么输出?

标签: python multithreading queue multiprocessing


【解决方案1】:

两个问题:

1) 您仅在第三个进程中加入

2) 为什么不使用 multiprocessing.Pool?

3) qu.get() 上的竞争条件

1 & 3)

import multiprocessing
from multiprocessing import Queue

def worker(i, qu):
    """worker function"""
    while 1:
        try:
            val=qu.get(timeout)
        except  Queue.Empty: break# Yay no race condition
        print 'Worker:',i, ' start with file:',val
        j=1
        for k in range(i*10000,(i+1)*10000): # some time consuming process
            for j in range(i*10000,(i+1)*10000):
                j=j+k
        print 'Worker:',i, ' end with file:',val


if __name__ == '__main__':
    jobs = []

    qu=Queue()
    for j in range(100,110): # files numbers are from 100 to 110
        qu.put(j)

    for i in range(3): # 3 multiprocess
        p = multiprocessing.Process(target=worker, args=(i,qu))
        jobs.append(p)
        p.start()
    for p in jobs: #<--- join on all processes ...
        p.join()

2)

Pool的使用方法见:

https://docs.python.org/2/library/multiprocessing.html

【讨论】:

    【解决方案2】:

    您只加入了最后一个创建的进程。这意味着如果在第三个进程完成时第一个或第二个进程仍在工作,则您的主进程正在关闭并在其余进程完成之前将其杀死。

    你应该加入他们,等待他们完成:

        for p in jobs:
            p.join()
    

    另一件事是您应该考虑使用qu.get_nowait() 以摆脱qu.empty()qu.get() 之间的竞争条件。

    例如:

        try:
            while 1:
                message = self.queue.get_nowait()
                """ do something fancy here """ 
        except Queue.Empty:
            pass
    

    希望对你有帮助

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-06-30
      • 2012-07-11
      • 1970-01-01
      相关资源
      最近更新 更多