【问题标题】:python pool apply_async and map_async do not block on full queuepython pool apply_async 和 map_async 不会阻塞完整队列
【发布时间】:2012-03-24 23:59:58
【问题描述】:

我对 python 还很陌生。 我正在使用多处理模块读取标准输入上的文本行,以某种方式转换它们并将它们写入数据库。这是我的代码的 sn-p:

batch = []
pool = multiprocessing.Pool(20)
i = 0
for i, content in enumerate(sys.stdin):
    batch.append(content)
    if len(batch) >= 10000:
        pool.apply_async(insert, args=(batch,i+1))
        batch = []
pool.apply_async(insert, args=(batch,i))
pool.close()
pool.join()

现在一切正常,直到我开始处理巨大的输入文件(数亿行),然后通过管道传输到我的 python 程序中。在某些时候,当我的数据库变慢时,我会看到内存已满。

玩了一会儿,发现 pool.apply_async 和 pool.map_async 从来没有阻塞过,所以要处理的调用队列越来越大。

解决我的问题的正确方法是什么?我希望我可以设置一个参数,一旦达到某个队列长度,它将阻止 pool.apply_async 调用。 Java 中的 AFAIR 可以为此目的为 ThreadPoolExecutor 提供一个具有固定长度的 BlockingQueue。

谢谢!

【问题讨论】:

  • “原来 pool.apply_async 和 pool.map_async 永远不会阻塞” - 我一直在寻找的一切

标签: python design-patterns queue multiprocessing python-multiprocessing


【解决方案1】:

apply_asyncmap_async 函数旨在不阻塞主进程。为此,Pool 维护了一个内部Queue,不幸的是它的大小无法更改。

解决问题的方法是使用Semaphore 初始化为您希望队列的大小。在为池提供数据之前以及在工作人员完成任务之后获取和释放信号量。

这是一个使用 Python 2.6 或更高版本的示例。

from threading import Semaphore
from multiprocessing import Pool

def task_wrapper(f):
    """Python2 does not allow a callback for method raising exceptions,
    this wrapper ensures the code run into the worker will be exception free.

    """
    try:
        return f()
    except:
        return None

class TaskManager(object):
    def __init__(self, processes, queue_size):
        self.pool = Pool(processes=processes)
        self.workers = Semaphore(processes + queue_size)

    def new_task(self, f):
        """Start a new task, blocks if queue is full."""
        self.workers.acquire()
        self.pool.apply_async(task_wrapper, args=(f, ), callback=self.task_done))

    def task_done(self):
        """Called once task is done, releases the queue is blocked."""
        self.workers.release()

另一个example 使用concurrent.futures 池实现。

【讨论】:

  • 在我的情况下,task_done 需要接受参数,否则会引发 ValueError。也许改成task_done(self, *args, **kwargs)会更安全?
【解决方案2】:

以防万一有人在这里结束,这就是我解决问题的方法:我停止使用 multiprocessing.Pool。这是我现在的做法:

#set amount of concurrent processes that insert db data
processes = multiprocessing.cpu_count() * 2

#setup batch queue
queue = multiprocessing.Queue(processes * 2)

#start processes
for _ in range(processes): multiprocessing.Process(target=insert, args=(queue,)).start() 

#fill queue with batches    
batch=[]
for i, content in enumerate(sys.stdin):
    batch.append(content)
    if len(batch) >= 10000:
        queue.put((batch,i+1))
        batch = []
if batch:
    queue.put((batch,i+1))

#stop processes using poison-pill
for _ in range(processes): queue.put((None,None))

print "all done."

在插入方法中,每个批次的处理都包装在一个循环中,该循环从队列中拉出直到它收到毒丸:

while True:
    batch, end = queue.get()
    if not batch and not end: return #poison pill! complete!
    [process the batch]
print 'worker done.'

【讨论】:

  • 不错的简单示例。多处理的池通常比它的价值更麻烦,特别是因为创建自己的进程池非常简单。
【解决方案3】:

apply_async 返回一个AsyncResult 对象,你可以wait 开启:

if len(batch) >= 10000:
    r = pool.apply_async(insert, args=(batch, i+1))
    r.wait()
    batch = []

尽管如果您想以更简洁的方式执行此操作,您应该使用 maxsize 为 10000 的 multiprocessing.Queue,并从从此类队列中获取的 multiprocessing.Process 派生一个 Worker 类。

【讨论】:

  • 很好地等待 AsyncResult 无济于事,因为我的问题是池中的队列变大了。不知能否控制池中内部队列的大小?
  • @konstantin:我不确定我是否理解。在等待AsyncResult 时,主进程无法填充下一批,对吧?
【解决方案4】:

不漂亮,但您可以访问内部队列大小并等待它低于您的最大期望大小,然后再添加新项目:

max_pool_queue_size = 20

for i in range(10000):
  pool.apply_async(some_func, args=(...))

  while pool._taskqueue.qsize() > max_pool_queue_size:
    time.sleep(1)

【讨论】:

    猜你喜欢
    • 2014-10-16
    • 1970-01-01
    • 2016-01-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多