【问题标题】:running several system commands in parallel in Python在 Python 中并行运行多个系统命令
【发布时间】:2011-02-14 12:59:11
【问题描述】:

我编写了一个简单的脚本,对一系列文件执行系统命令。 为了加快速度,我想并行运行它们,但不是一次全部运行——我需要控制同时运行的命令的最大数量。 解决这个问题的最简单方法是什么?

【问题讨论】:

  • @unholysampler:这个问题既与多线程无关,也与线程池无关。线程可能是给定问题的一种解决方案,但在我看来是一个糟糕的解决方案。我将再次删除这些标签。
  • @S.Lott。限制最大进程数似乎是合理的。想象一下,您有 100k 进程要启动,您会希望同时运行它们并生成所有进程?即使操作系统可以应付它......
  • @S.Lott 如果正在启动的进程是数据库密集型进程,您可以通过并行运行少量进程来加快速度,但在某个点争用之后会导致速度变慢。
  • @S.Lott 例如,如果系统命令是 sftp,那么您可能希望并行运行有限数量的进程。鉴于问题引用了系统命令,我对数据库的引用可能没有帮助,但这就是我过去遇到这种情况的原因。

标签: python


【解决方案1】:

如果您仍然调用子进程,我认为不需要使用线程池。使用subprocess 模块的基本实现是

import subprocess
import os
import time

files = <list of file names>
command = "/bin/touch"
processes = set()
max_processes = 5

for name in files:
    processes.add(subprocess.Popen([command, name]))
    if len(processes) >= max_processes:
        os.wait()
        processes.difference_update([
            p for p in processes if p.poll() is not None])

在 Windows 上,os.wait() 不可用(也没有任何其他等待任何子进程终止的方法)。您可以通过以特定间隔进行轮询来解决此问题:

for name in files:
    processes.add(subprocess.Popen([command, name]))
    while len(processes) >= max_processes:
        time.sleep(.1)
        processes.difference_update([
            p for p in processes if p.poll() is not None])

睡眠时间取决于子进程的预期执行时间。

【讨论】:

  • 感谢您编码我的建议:) +1 给你
  • 谢谢!这似乎是我需要的——而且非常简单。但是我应该指出我在 Windows 上,似乎不支持 os.wait() 。有什么简单的解决方法吗?
  • @user476983:不幸的是,Windows 不允许等待 any 子节点的终止。您可以通过每秒轮询所有子进程一次左右来解决此问题(取决于子进程的执行需要多长时间)。
  • “processes.difference_update( p for p in processes if p.poll() is not None)”这行似乎有问题。这会导致“RuntimeError:在迭代期间设置更改的大小”
  • @Mannaggia:您建议的代码的括号不匹配。将生成器表达式分配给临时变量应该没有什么区别。将其转换为列表理解应该可以解决问题——我会更新答案。 (该错误可能是由罕见的竞争条件引起的。编辑代码并重试不会告诉您竞争条件是否已修复。它可能在特定的运行中没有发生,但会在下一次运行中再次发生。 )
【解决方案2】:

Sven Marnach 的回答几乎是正确的,但有一个问题。如果最后一个 max_processes 进程结束,主程序将尝试启动另一个进程,for 循环将结束。这将关闭主进程,而主进程又可以关闭子进程。对我来说,这种行为发生在 screen 命令中。

Linux中的代码会是这样的(并且只适用于python2.7):

import subprocess
import os
import time

files = <list of file names>
command = "/bin/touch"
processes = set()
max_processes = 5

for name in files:
    processes.add(subprocess.Popen([command, name]))
    if len(processes) >= max_processes:
        os.wait()
        processes.difference_update(
            [p for p in processes if p.poll() is not None])
#Check if all the child processes were closed
for p in processes:
    if p.poll() is None:
        p.wait()

【讨论】:

  • 我认为您应该删除它并通过编辑将其添加到 Sven 的答案中。这是 SO 上的错误形式吗?
  • Glory be to those that answer
【解决方案3】:

您需要将Semaphore 对象与threads 结合起来。 Semaphore 是一个对象,可让您限制在给定代码段中运行的线程数。在这种情况下,我们将使用信号量来限制可以运行 os.system 调用的线程数。

首先我们导入我们需要的模块:

#!/usr/bin/python

import threading
import os

接下来我们创建一个 Semaphore 对象。这里的数字四是一次可以获取信号量的线程数。这限制了一次可以运行的子进程的数量。

semaphore = threading.Semaphore(4)

这个函数只是把对子进程的调用封装在对信号量的调用中。

def run_command(cmd):
    semaphore.acquire()
    try:
        os.system(cmd)
    finally:
        semaphore.release()

如果您使用的是 Python 2.6+,这会变得更加简单,因为您可以使用“with”语句来执行获取和释放调用。

def run_command(cmd):
    with semaphore:
        os.system(cmd)

最后,为了证明它按预期工作,我们将调用“sleep 10”命令八次。

for i in range(8):
    threading.Thread(target=run_command, args=("sleep 10", )).start()

使用“时间”程序运行脚本表明它只需要 20 秒,因为两个批次的四个睡眠是并行运行的。

aw@aw-laptop:~/personal/stackoverflow$ time python 4992400.py 

real    0m20.032s                                                                                                                                                                   
user    0m0.020s                                                                                                                                                                    
sys     0m0.008s 

【讨论】:

  • 我不喜欢为此使用线程。它们是完全没有必要的——无论如何你都在启动子流程。
  • 线程虽然很便宜,但信号量使跟踪正在运行的进程数量变得非常简单。
  • 是的,代码看起来不错,尤其是在使用with 语句时。一个缺点是在进程非常多的情况下,你会无条件地先启动一大堆线程。
【解决方案4】:

我将 Sven 和 Thuener 的解决方案合并为一个等待尾随进程并在其中一个进程崩溃时停止的解决方案:

def removeFinishedProcesses(processes):
    """ given a list of (commandString, process), 
        remove those that have completed and return the result 
    """
    newProcs = []
    for pollCmd, pollProc in processes:
        retCode = pollProc.poll()
        if retCode==None:
            # still running
            newProcs.append((pollCmd, pollProc))
        elif retCode!=0:
            # failed
            raise Exception("Command %s failed" % pollCmd)
        else:
            logging.info("Command %s completed successfully" % pollCmd)
    return newProcs

def runCommands(commands, maxCpu):
            processes = []
            for command in commands:
                logging.info("Starting process %s" % command)
                proc =  subprocess.Popen(shlex.split(command))
                procTuple = (command, proc)
                processes.append(procTuple)
                while len(processes) >= maxCpu:
                    time.sleep(.2)
                    processes = removeFinishedProcesses(processes)

            # wait for all processes
            while len(processes)>0:
                time.sleep(0.5)
                processes = removeFinishedProcesses(processes)
            logging.info("All processes completed")

【讨论】:

    【解决方案5】:

    你要求的是一个线程池。有固定数量的线程可用于执行任务。当没有运行任务时,它会在任务队列中等待,以便获得一段新的代码来执行。

    有这个thread pool module,但是有评论说它还没有被认为是完整的。可能还有其他包,但这是我找到的第一个。

    【讨论】:

      【解决方案6】:

      如果您正在运行系统命令,您可以使用子流程模块创建流程实例,根据需要调用它们。应该不需要线程(它的非pythonic)并且多进程对于这个任务来说似乎有点过分了。

      【讨论】:

        【解决方案7】:

        此答案与此处提供的其他答案非常相似,但它使用列表而不是集合。 出于某种原因,在使用这些答案时,我遇到了关于集合大小变化的运行时错误。

        from subprocess import PIPE
        import subprocess
        import time
        
        
        def submit_job_max_len(job_list, max_processes):
          sleep_time = 0.1
          processes = list()
          for command in job_list:
            print 'running {n} processes. Submitting {proc}.'.format(n=len(processes),
                proc=str(command))
            processes.append(subprocess.Popen(command, shell=False, stdout=None,
              stdin=PIPE))
            while len(processes) >= max_processes:
              time.sleep(sleep_time)
              processes = [proc for proc in processes if proc.poll() is None]
          while len(processes) > 0:
            time.sleep(sleep_time)
            processes = [proc for proc in processes if proc.poll() is None]
        
        
        cmd = '/bin/bash run_what.sh {n}'
        job_list = ((cmd.format(n=i)).split() for i in range(100))
        submit_job_max_len(job_list, max_processes=50)
        

        【讨论】:

        • 快速查询人。我正在尝试您的解决方案。基本上是在上面列出的解决方案中尝试使用多个命令传递一个 shell 脚本。 range(100) 中提到的值,它只执行 1 个命令 100 次。这基本上不满足并行方法应该是什么。如果我错了,请纠正我;刚开始 Python 有很多困惑。感谢您的帮助。
        猜你喜欢
        • 2023-03-06
        • 1970-01-01
        • 2022-11-02
        • 1970-01-01
        • 2011-03-09
        • 1970-01-01
        • 2019-03-05
        • 2012-05-25
        • 2020-07-21
        相关资源
        最近更新 更多