【问题标题】:multiprocessing Pool and generators多处理池和生成器
【发布时间】:2018-12-06 19:15:59
【问题描述】:

首先看下面的代码:

pool = multiprocessing.Pool(processes=N)
batch = []
for item in generator():
    batch.append(item)
    if len(batch) == 10:
        pool.apply_async(my_fun, args=(batch,))
        batch = []
# leftovers
pool.apply_async(my_fun, args=(batch,))

基本上我是从生成器中检索数据,收集到一个列表中,然后生成一个消耗这批数据的进程。

这可能看起来不错,但是当消费者(又名池进程)比生产者(又名生成器)慢时,主进程的内存使用量会增长,直到生成器停止或...系统内存不足。

如何避免这个问题?

【问题讨论】:

  • 您是否尝试过构建列表列表并使用pool.map_async()?或者starmap_async??
  • 查看类似问题stackoverflow.com/questions/17241663/…,了解如何将队列与进程池结合使用。
  • apply_async 返回一个AsyncResult 对象,我没有看到你在任何地方使用它。

标签: python multiprocessing generator


【解决方案1】:

在这种情况下,您可能希望使用有限大小的队列。

q = multiprocessing.Queue(maxSize).

当与最大值一起使用时。大小,这将为您提供必要的计数并在调用 q.put() 已满时阻塞线程,因此您永远不能在其上发布超过一定数量的工作项,从而限制存储所需的内存待处理的项目。

或者,您可以使用计数信号量(例如,multiprocessing.BoundedSemaphore(maxSize))。每次从生成器获取工作项时获取它,并在处理项后将其释放到工作函数 (my_fun) 中。这样,等待处理的工作项的最大数量永远不会超过信号量的初始值。

【讨论】:

  • 谢谢,我认为信号量可以。
【解决方案2】:

使用grouper itertools 配方将生成器中的数据分块

使用concurrent futures 中的基础架构,通过流程处理任务提交和检索。

你可以

  • 提交一组任务;等待他们完成;然后提交另一个组,或
  • 通过在每次完成任务时提交一个新任务来保持管道满载。

设置(尝试模拟您的过程):

import concurrent.futures
import itertools, time, collections, random
from pprint import pprint

# from itertools recipes
def grouper(iterable, n, fillvalue=None):
    "Collect data into fixed-length chunks or blocks"
    # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
    args = [iter(iterable)] * n
    return itertools.zip_longest(*args, fillvalue=fillvalue)

# generator/iterator facsimile
class G:
    '''Long-winded range(n)'''
    def __init__(self, n=108):
        self.n = n
        self.a = []
    def __iter__(self):
        return self
    def __next__(self):
        #self.a.append(time.perf_counter())
        if self.n < 0:
            raise StopIteration
        x = self.n
        self.n -= 1
        return x

def my_func(*args):
    time.sleep(random.randint(1,10))
    return sum(*args)

等待任务组完成

if __name__ == '__main__':
    nworkers = 4
    g = G()
    # generate data three-at-a-time
    data = grouper(g, 3, 0)
    results = []
    fs = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=nworkers) as executor:
        for args in data:
            print(f'pending:{len(executor._pending_work_items)}')
            # block submission - limit pending tasks to conserve resources (memory) 
            if len(executor._pending_work_items) == nworkers:
                # wait till all complete and get the results
                futures = concurrent.futures.wait(fs, return_when=concurrent.futures.ALL_COMPLETED)
                #print(futures)
                results.extend(future.result() for future in futures.done)
                fs = list(futures.not_done)
            # add a new task
            fs.append(executor.submit(my_func, args))
        # data exhausted - get leftover results as they finish
        for future in concurrent.futures.as_completed(fs):
            print(f'pending:{len(executor._pending_work_items)}')
            result = future.result()
            results.append(result)

    pprint(results)

保持进程池

if __name__ == '__main__':
    nworkers = 4
    g = G()
    # generate data three-at-a-time
    data = grouper(g, 3, 0)
    results = []
    fs = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=nworkers) as executor:
        for args in data:
            print(f'pending:{len(executor._pending_work_items)}')
            # block submission - limit pending tasks to conserve resources (memory) 
            if len(executor._pending_work_items) == nworkers:
                # wait till one completes and get the result
                futures = concurrent.futures.wait(fs, return_when=concurrent.futures.FIRST_COMPLETED)
                #print(futures)
                results.extend(future.result() for future in futures.done)
                fs = list(futures.not_done)
            # add a new task
            fs.append(executor.submit(my_func, args))
        # data exhausted - get leftover results as they finish
        for future in concurrent.futures.as_completed(fs):
            print(f'pending:{len(executor._pending_work_items)}')
            result = future.result()
            results.append(result)

    pprint(results)

【讨论】:

  • 嗯,主要问题是数据不适合内存(因此使用了生成器),所以我看不出这是如何避免这种情况的。
  • 我同意@Manuel,这并不能解决内存问题(至少在理论上)。
猜你喜欢
  • 1970-01-01
  • 2016-04-18
  • 2020-08-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-07-24
  • 2020-08-14
  • 2016-11-10
相关资源
最近更新 更多