【问题标题】:Python multiprocessing pool: dynamically set number of processes during execution of tasksPython多处理池:在任务执行期间动态设置进程数
【发布时间】:2018-04-12 08:01:52
【问题描述】:

我们在开发机器上使用 Python 2.7(由许多独立的并行进程组成)提交大型 CPU 密集型作业,这些作业一次持续数天。当这些作业与大量进程一起运行时,机器的响应速度会大大降低。理想情况下,我想在我们开发代码时限制白天可用的 CPU 数量,并在夜间尽可能高效地运行尽可能多的进程。

Python 多处理库允许您在启动池时指定进程数。有没有办法在每次启动新任务时动态更改这个数字?

例如,允许 20 个进程在 19-07 小时内运行,10 个进程在 07-19 小时内运行。

一种方法是检查使用大量 CPU 的活动进程的数量。这就是我希望它的工作方式:

from multiprocessing import Pool
import time 

pool = Pool(processes=20)

def big_task(x):
    while check_n_process(processes=10) is False:
        time.sleep(60*60)
    x += 1
    return x 


x = 1
multiple_results = [pool.apply_async(big_task, (x)) for i in range(1000)]
print([res.get() for res in multiple_results])

但我需要编写“check_n_process”函数。

还有其他想法可以解决这个问题吗?

(代码需要在 Python 2.7 中运行 - bash 实现是不可行的)。

【问题讨论】:

    标签: python multiprocessing


    【解决方案1】:

    Python multiprocessing.Pool 不提供更改正在运行的 Pool 的工作人员数量的方法。一个简单的解决方案是依赖第三方工具。

    billiard 提供的 Pool 用于提供这样的功能。

    CeleryLuigi 这样的任务队列框架肯定允许灵活的工作负载,但要复杂得多。

    如果使用外部依赖不可行,可以试试下面的方法。从this answer 阐述,您可以设置基于信号量的节流机制。

    from threading import Semaphore, Lock
    from multiprocessing import Pool
    
    def TaskManager(object):
        def __init__(self, pool_size):
            self.pool = Pool(processes=pool_size)
            self.workers = Semaphore(pool_size)
            # ensures the semaphore is not replaced while used
            self.workers_mutex = Lock()  
    
        def change_pool_size(self, new_size):
            """Set the Pool to a new size."""
            with self.workers_mutex:  
                self.workers = Semaphore(new_size)
    
        def new_task(self, task):
            """Start a new task, blocks if queue is full."""
            with self.workers_mutex:
                self.workers.acquire()
    
            self.pool.apply_async(big_task, args=[task], callback=self.task_done))
    
        def task_done(self):
            """Called once task is done, releases the queue is blocked."""
            with self.workers_mutex:
                self.workers.release()
    

    如果超过 X 个工作人员在忙,该池将阻止进一步尝试安排您的 big_tasks。通过控制这种机制,您可以限制同时运行的进程数量。当然,这意味着你放弃了Pool的排队机制。

    task_manager = TaskManager(20)
    
    while True:
        if seven_in_the_morning():
            task_manager.change_pool_size(10)
        if seven_in_the_evening():
            task_manager.change_pool_size(20)
    
        task = get_new_task()
        task_manager.new_task()  # blocks here if all workers are busy
    

    【讨论】:

      【解决方案2】:

      这是非常不完整的(也是一个老问题),但是您可以通过跟踪正在运行的进程并仅在合适时调用 apply_async() 来管理负载;如果每个作业的运行时间少于永久,您可以通过在工作时间或 os.getloadavg() 太高时分派更少的作业来降低负载。 我这样做是为了在运行多个“scp”以规避我们内部网络上的流量整形时管理网络负载(不要告诉任何人!)

      【讨论】:

        猜你喜欢
        • 2013-12-01
        • 2021-07-28
        • 2015-01-30
        • 1970-01-01
        • 2022-01-08
        • 2022-01-19
        • 1970-01-01
        • 2018-11-29
        • 1970-01-01
        相关资源
        最近更新 更多