【问题标题】:How to create a continuous stream of Python's concurrent.futures.ProcessPoolExecutor.submits()?如何创建 Python concurrent.futures.ProcessPoolExecutor.submit() 的连续流?
【发布时间】:2020-09-06 09:31:14
【问题描述】:

我可以提交批次的concurrent.futures.ProcessPoolExecutor.submits(),其中每批次可能包含多个submit()。但是,我注意到如果每批提交都消耗大量 RAM,则可能会出现相当多的 RAM 使用效率低下;需要等待批次中的所有期货都完成后,才能提交另一批submit()

如何创建 Python 的concurrent.futures.ProcessPoolExecutor.submit() 的连续流,直到满足某些条件?

测试脚本:

#!/usr/bin/env python3

import numpy as np
from numpy.random import default_rng, SeedSequence
import concurrent.futures as cf
from itertools import count


def dojob( process, iterations, samples, rg ):
    # Do some tasks
    result = []
    for i in range( iterations ):
        a = rg.standard_normal( samples )
        b = rg.integers( -3, 3, samples )
        mean = np.mean( a + b )
        result.append( ( i, mean ) )
        return { process : result }

if __name__ == '__main__':

    cpus = 2
    iterations = 10000
    samples = 1000

    # Setup NumPy Random Generator
    ss = SeedSequence( 1234567890 )
    child_seeds = ss.spawn( cpus )
    rg_streams = [ default_rng(s) for s in child_seeds ]

    # Peform concurrent analysis by batches
    counter = count( start=0, step=1 )

    # Serial Run of dojob
    process = next( counter )
    for cpu in range( cpus ):
        process = next( counter )
        rg = rg_streams[ cpu ]
        rdict = dojob( process, iterations, samples, rg )
    print( 'rdict', rdict )

    
    # Concurrent Run of dojob
    futures = []
    results = []
    with cf.ProcessPoolExecutor( max_workers=cpus ) as executor:

        while True:
            
            for cpu in range( cpus ):
                process = next( counter )
                rg = rg_streams[ cpu ]
                futures.append( executor.submit( dojob, process, iterations, samples, rg ) )
                
            for future in cf.as_completed( futures ):
                # Do some post processing
                r = future.result()
                for k, v in r.items():
                    if len( results ) < 5000:
                        results.append( np.std( v ) )
                        print( k, len(results) )

            if len(results) <= 100: #Put a huge number to simulate continuous streaming 
                futures = []
                child_seeds = child_seeds[0].spawn( cpus )
                rg_streams = [ default_rng(s) for s in child_seeds ]
            else:
                break

    print( '\n*** Concurrent Analyses Ended ***' )

【问题讨论】:

  • 请编辑一个在您的问题中显示此问题的最小示例 - 它应该是 minimal reproducible example,因此我应该能够将其粘贴到文件中并运行它以查看您所看到的问题。
  • @barny 感谢您的协助。我添加了一个测试脚本来广泛展示批处理方法。
  • 仅仅等待 任何 个期货完成难道不够吗(例如使用fut.add_done_callback()),一旦发生这种情况,看看是否有“空间”来催生新工作?您可以使用threading.Event 来有效地等待该结果。

标签: python concurrent.futures


【解决方案1】:

为了扩展我的评论,使用完成回调和threading.Condition 这样的事情怎么样?我也冒昧地添加了一个进度指示器。

编辑:我将它重构为一个简洁的函数,您可以传递所需的并发性和队列深度,以及一个生成新作业的函数,以及另一个处理结果并让执行程序知道的函数你是否受够了。

import concurrent.futures as cf
import threading
import time
from itertools import count

import numpy as np
from numpy.random import SeedSequence, default_rng


def dojob(process, iterations, samples, rg):
    # Do some tasks
    result = []
    for i in range(iterations):
        a = rg.standard_normal(samples)
        b = rg.integers(-3, 3, samples)
        mean = np.mean(a + b)
        result.append((i, mean))
    return {process: result}


def execute_concurrently(cpus, max_queue_length, get_job_fn, process_result_fn):
    running_futures = set()
    jobs_complete = 0
    job_cond = threading.Condition()
    all_complete_event = threading.Event()

    def on_complete(future):
        nonlocal jobs_complete
        if process_result_fn(future.result()):
            all_complete_event.set()
        running_futures.discard(future)
        jobs_complete += 1
        with job_cond:
            job_cond.notify_all()

    time_since_last_status = 0
    start_time = time.time()
    with cf.ProcessPoolExecutor(cpus) as executor:
        while True:
            while len(running_futures) < max_queue_length:
                fn, args = get_job_fn()
                fut = executor.submit(fn, *args)
                fut.add_done_callback(on_complete)
                running_futures.add(fut)

            with job_cond:
                job_cond.wait()

            if all_complete_event.is_set():
                break

            if time.time() - time_since_last_status > 1.0:
                rps = jobs_complete / (time.time() - start_time)
                print(
                    f"{len(running_futures)} running futures on {cpus} CPUs, "
                    f"{jobs_complete} complete. RPS: {rps:.2f}"
                )
                time_since_last_status = time.time()


def main():
    ss = SeedSequence(1234567890)
    counter = count(start=0, step=1)
    iterations = 10000
    samples = 1000
    results = []

    def get_job():
        seed = ss.spawn(1)[0]
        rg = default_rng(seed)
        process = next(counter)
        return dojob, (process, iterations, samples, rg)

    def process_result(result):
        for k, v in result.items():
            results.append(np.std(v))
        if len(results) >= 10000:
            return True  # signal we're complete

    execute_concurrently(
        cpus=16,
        max_queue_length=20,
        get_job_fn=get_job,
        process_result_fn=process_result,
    )


if __name__ == "__main__":
    main()

【讨论】:

  • Executor 在退出with 块时应该关闭。也许这些天我只是使用multiprocessing.Pool()来实现它...
  • 非常感谢您的回答。终于有时间去做了。我发布了一个附加答案,建议对您的算法进行两项修改,这对我有帮助。请随时发表评论。干杯。
【解决方案2】:

@AKX 发布的答案有效。向他致敬。经过测试,我想推荐两个我认为值得考虑和实施的修正。

修正1:要提前取消python脚本的执行,必须使用Ctrl+C。不幸的是,这样做不会终止正在执行函数dojob()concurrent.futures.ProcessPoolExecutor() 进程。当完成dojob() 所花费的时间较长时,此问题变得更加明显;这种情况可以通过使脚本中的样本量变大来模拟(例如samples = 100000)。执行终端命令ps -ef | grep python 时会出现此问题。此外,如果dojob() 消耗大量 RAM,则这些并发进程使用的内存在手动终止并发进程之前不会被释放(例如kill -9 [PID])。为了解决这些问题,需要进行以下修改。

with job_cond:
    job_cond.wait()

应该改为:

try:
    with job_cond:
        job_cond.wait()
except KeyboardInterrupt:
    # Cancel running futures
    for future in running_futures:
        _ = future.cancel()
    # Ensure concurrent.futures.executor jobs really do finish.
    _ = cf.wait(running_futures, timeout=None)

所以当必须使用 Ctrl+C 时,您只需先按一次即可。接下来,给running_futures 中的期货取消一些时间。这可能需要几秒钟到几秒钟才能完成;这取决于dojob() 的资源需求。您可以在任务管理器或系统监视器中看到 CPU 活动降至零,或者听到 cpu 冷却风扇降低的高转速声音。请注意,使用的 RAM 尚未释放。此后,再次按 Ctrl+C 应该允许所有并发进程干净退出,从而释放使用的 RAM。

修正 2: 目前,内部 while 循环规定作业必须以 cpu“mainThread”允许的速度连续提交。实际上,能够提交比 cpus 池中可用的 cpus 更多的作业没有任何好处。这样做只会不必要地消耗来自主处理器的“MainThread”的 cpu 资源。为了规范连续提交作业,可以使用一个新的submit_jobthreading.Event()对象。

首先,定义这样一个对象并将其值设置为True

submit_job = threading.Event()
submit_job.set()

接下来,在内部while循环的末尾添加这个条件和.wait()方法:

with cf.ProcessPoolExecutor(cpus) as executor:
    while True:
        while len(running_futures) < max_queue_length:
            fn, args = get_job_fn()
            fut = executor.submit(fn, *args)
            fut.add_done_callback(on_complete)
            running_futures.add(fut)
            if len(running_futures) >= cpus: # Add this line
                submit_job.clear()           # Add this line
            submit_job.wait()                # Add this line

最后把on_complete(future)回调改成:

def on_complete(future):
    nonlocal jobs_complete
    if process_result_fn(future.result()):
        all_complete_event.set()
    running_futures.discard(future)
    if len(running_futures) < cpus: # add this conditional setting
        submit_job.set()            # add this conditional setting
    jobs_complete += 1
    with job_cond:
        job_cond.notify_all()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-12-21
    相关资源
    最近更新 更多