【问题标题】:Python multiprocessing pool: How to get all processes running currentlyPython多处理池:如何让所有进程当前运行
【发布时间】:2019-11-27 17:18:12
【问题描述】:

使用 Python 3 的多处理 Pool,如何获取当前由工作进程执行的作业。在下面的简化示例中,我将变量 x 也定义为特定进程的 ID,这就是我要查找的内容。

from multiprocessing import Pool

done_tasks = []

def f(x):
    return x*x

def on_task_done(x):
    done_tasks.append(x)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    ids = [0, 1, 2, 3, 4, ...]
    for id in ids:
        pool.apply_async(start_worker, id, callback=on_task_done)

    # wait and print currently running processes
    while True:
        # TODO print currently running processes

        if len(done_tasks) >= len(ids):
            break

具体来说,我如何才能找到工作进程在任何给定时间点当前正在执行的进程的 ID(例如,在循环中等待直到完成所有提交到池的作业)?

【问题讨论】:

    标签: python-3.x python-multiprocessing


    【解决方案1】:

    您可以使用Manager 实例来创建dict,该实例可用于存储当前正在运行的作业的ID。您可以这样做:

    import uuid
    import time
    import random
    from multiprocessing import Pool, Manager
    
    
    done_tasks = []
    
    
    def square(x, procs):
        pid = uuid.uuid4().hex
        procs[pid] = True
    
        # Simulate a long-running task:
        time.sleep(random.randint(1, 3))
    
        procs[pid] = False
    
        return x * x
    
    
    def on_task_done(results):
        done_tasks.append(results)
    
    
    def main():
        m = Manager()
        procs = m.dict()
        pool = Pool(processes=2)
    
        for x in range(10):
            pool.apply_async(square, args=(x, procs), callback=on_task_done)
    
        while len(done_tasks) < 10:
            pids = [pid for pid, running in procs.items() if running]
            print('running jobs:', pids)
            time.sleep(1)
    
        print('results:', done_tasks)
    
        pool.close()
        pool.join()
    
    
    if __name__ == '__main__':
        main()
    

    输出:

    running jobs: []
    running jobs: ['949784a09e1d4f09b647cae584e86277', 'de3c23dd3c174e9a8d7f5c2a7d89a183']
    running jobs: ['de3c23dd3c174e9a8d7f5c2a7d89a183']
    running jobs: ['47f536e9e3584e808bd7475ad43ee49a']
    running jobs: ['e06d73341d774f86a8a00d0bb2914642', '874b194b548e4f47b02859b4f704a4f2']
    running jobs: ['874b194b548e4f47b02859b4f704a4f2', 'cb95a6bcc8ff47bba271cb845a919245']
    running jobs: ['3c9fbf7e4c604dedae033b7ef93c3523', '4863c18062504a8190b415bc42874bb7']
    running jobs: ['3c9fbf7e4c604dedae033b7ef93c3523', '4863c18062504a8190b415bc42874bb7']
    running jobs: ['3c9fbf7e4c604dedae033b7ef93c3523', '4863c18062504a8190b415bc42874bb7']
    running jobs: ['2f896b86aa1647fea63c522fb2c67684', 'aba21db1b1af4fd8896a6b59534a254f']
    running jobs: ['2f896b86aa1647fea63c522fb2c67684']
    running jobs: ['2f896b86aa1647fea63c522fb2c67684']
    results: [0, 1, 4, 9, 16, 25, 36, 49, 81, 64]
    

    【讨论】:

    • 您好,非常感谢您的回答!你确定tasks_done 是空的吗?我的非简化示例工作得很好,根据文档,回调函数(在简化示例中 on_task_done) 在主进程中执行(因此不是由每个工作进程调用),因此每当调用 on_task_done 时,它看到相同的列表。
    猜你喜欢
    • 2021-11-20
    • 2013-12-01
    • 2015-01-11
    • 2022-01-08
    • 1970-01-01
    • 1970-01-01
    • 2013-04-30
    • 1970-01-01
    • 2015-10-21
    相关资源
    最近更新 更多