【问题标题】:Control the number of subprocesses using to call external commands in python在python中控制用于调用外部命令的子进程数
【发布时间】:2012-04-06 05:07:29
【问题描述】:

我了解使用subprocess 是调用外部命令的首选方式。

但是,如果我想并行运行多个命令,但要限制生成的进程数怎么办?困扰我的是我无法阻止子进程。例如,如果我调用

subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile)

然后该过程将继续,无需等待cmd 完成。因此,我无法将其包装在multiprocessing 图书馆的工作人员中。

例如,如果我这样做:

def worker(cmd): 
    subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile);

pool = Pool( processes = 10 );
results =[pool.apply_async(worker, [cmd]) for cmd in cmd_list];
ans = [res.get() for res in results];

然后每个工作人员将在生成子进程后完成并返回。所以我不能通过使用Pool 来真正限制subprocess 生成的进程数。

限制子进程数量的正确方法是什么?

【问题讨论】:

    标签: python parallel-processing subprocess multiprocessing


    【解决方案1】:

    如果您想等待命令完成,可以使用subprocess.call。请参阅pydoc subprocess 了解更多信息。

    你也可以在你的worker中调用Popen.wait方法:

    def worker(cmd): 
        p = subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile);
        p.wait()
    

    【讨论】:

    • 这会完全禁用并行处理
    • 不应该。问题是使用multiprocessing 模块,并且每个工作人员都在一个单独的进程中产生,因此一个工作人员中的wait()ing 不会阻止其他工作人员运行。也就是说,这本身是不正确的——这个例子没有 return 来自工作人员的任何东西,所以在结果上调用 .get() 不会返回任何东西。
    【解决方案2】:

    您不需要多个 Python 进程甚至线程来限制最大并行子进程数:

    from itertools import izip_longest
    from subprocess import Popen, STDOUT
    
    groups = [(Popen(cmd, stdout=outputfile, stderr=STDOUT)
              for cmd in commands)] * limit # itertools' grouper recipe
    for processes in izip_longest(*groups): # run len(processes) == limit at a time
        for p in filter(None, processes):
            p.wait()
    

    Iterate an iterator by chunks (of n) in Python?

    如果您想同时限制最大和最小并行子进程数,您可以使用线程池:

    from multiprocessing.pool import ThreadPool
    from subprocess import STDOUT, call
    
    def run(cmd):
        return cmd, call(cmd, stdout=outputfile, stderr=STDOUT)
    
    for cmd, rc in ThreadPool(limit).imap_unordered(run, commands):
        if rc != 0:
            print('{cmd} failed with exit status: {rc}'.format(**vars()))
    

    只要任何limit 子进程结束,就会启动一个新的子进程以始终保持limit 子进程的数量。

    或者使用ThreadPoolExecutor:

    from concurrent.futures import ThreadPoolExecutor # pip install futures
    from subprocess import STDOUT, call
    
    with ThreadPoolExecutor(max_workers=limit) as executor:
        for cmd in commands:
            executor.submit(call, cmd, stdout=outputfile, stderr=STDOUT)
    

    这是一个简单的线程池实现:

    import subprocess
    from threading import Thread
    
    try: from queue import Queue
    except ImportError:
        from Queue import Queue # Python 2.x
    
    
    def worker(queue):
        for cmd in iter(queue.get, None):
            subprocess.check_call(cmd, stdout=outputfile, stderr=subprocess.STDOUT)
    
    q = Queue()
    threads = [Thread(target=worker, args=(q,)) for _ in range(limit)]
    for t in threads: # start workers
        t.daemon = True
        t.start()
    
    for cmd in commands:  # feed commands to threads
        q.put_nowait(cmd)
    
    for _ in threads: q.put(None) # signal no more commands
    for t in threads: t.join()    # wait for completion
    

    为避免过早退出,添加异常处理。

    如果您想在字符串中捕获子进程的输出,请参阅Python: execute cat subprocess in parallel

    【讨论】:

      猜你喜欢
      • 2014-11-13
      • 2016-09-30
      • 2021-04-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-01-13
      • 1970-01-01
      相关资源
      最近更新 更多