【问题标题】:How to run multiple shell jobs using separate threads and wait for each to finish after executing each simultaneously?如何使用单独的线程运行多个 shell 作业并在同时执行每个线程后等待每个线程完成?
【发布时间】:2020-03-27 15:21:12
【问题描述】:

我正在编写一个需要处理大量数据的脚本。我意识到脚本的并行组件实际上对具有大量单独数据点的实例没有帮助。我将创建临时文件并并行运行它们。我在qsub 上运行它,所以我将通过-pe threaded $N_JOBS 分配一定数量的线程(在这个小例子中是4)。

我的最终目标是使用我分配的线程之一启动每个进程,然后等待所有作业完成后再继续。

但是,我只使用过process = subprocess.Popenprocess.communicate() 来运行shell 作业。由于僵尸进程,我过去在使用process.wait() 时遇到了一些麻烦。

如何修改我的run 函数来启动作业,而不是等待完成,然后开始下一个作业,然后在所有作业运行后,等待所有作业完成?

如果不清楚,请告诉我,我可以更好地解释。在下面的示例中(可能是一个糟糕的示例?),我想使用 4 个单独的线程(我不知道如何设置这个 b/c 我只为简单的并行化做过joblib.Parallel)其中每个线程运行命令echo '$THREAD' && sleep 1。所以最终它应该花费 1 秒多一点而不是 ~4 秒。

我找到了这篇文章:Python threading multiple bash subprocesses?,但我不确定如何使用我的run 脚本来适应我的情况。

import sys, subprocess, time 

# Number of jobs
N_JOBS=4

# Run command
def run(
    cmd,
    popen_kws=dict(),
    ):

    # Run
    f_stdout = subprocess.PIPE
    f_stderr = subprocess.PIPE

    # Execute the process
    process_ = subprocess.Popen(cmd, shell=True, stdout=f_stdout, stderr=f_stderr, **popen_kws) 
    # Wait until process is complete and return stdout/stderr
    stdout_, stderr_ = process_.communicate() # Use this .communicate instead of .wait to avoid zombie process that hangs due to defunct. Removed timeout b/c it's not available in Python 2

    # Return code
    returncode_ = process_.returncode

    return {"process":process_, "stdout":stdout_, "stderr":stderr_, "returncode":returncode_}

# Commands
cmds = list(map(lambda x:"echo '{}' && sleep 1".format(x), range(1, N_JOBS+1)))
# ["echo '1'", "echo '2'", "echo '3'", "echo '4'"]

# Start time 
start_time = time.time()
results = dict()
for thread, cmd in enumerate(cmds, start=1):
    # Run command but don't wait for it to finish (Currently, it's waiting to finish)
    results[thread] = run(cmd)

# Now wait until they are all finished
print("These jobs took {} seconds\n".format(time.time() - start_time))
print("Here's the results:", *results.items(), sep="\n")
print("\nContinue with script. .. ...")

# These jobs took 4.067937850952148 seconds

# Here's the results:
# (1, {'process': <subprocess.Popen object at 0x1320766d8>, 'stdout': b'1\n', 'stderr': b'', 'returncode': 0})
# (2, {'process': <subprocess.Popen object at 0x1320547b8>, 'stdout': b'2\n', 'stderr': b'', 'returncode': 0})
# (3, {'process': <subprocess.Popen object at 0x132076ba8>, 'stdout': b'3\n', 'stderr': b'', 'returncode': 0})
# (4, {'process': <subprocess.Popen object at 0x132076780>, 'stdout': b'4\n', 'stderr': b'', 'returncode': 0})

# Continue with script. .. ...

我已经尝试按照 multiprocessing https://docs.python.org/3/library/multiprocessing.html 上的文档进行操作,但要根据我的情况调整它真的很令人困惑:

# Run command
def run(
    cmd,
    errors_ok=False,
    popen_kws=dict(),
    ):

    # Run
    f_stdout = subprocess.PIPE
    f_stderr = subprocess.PIPE

    # Execute the process
    process_ = subprocess.Popen(cmd, shell=True, stdout=f_stdout, stderr=f_stderr, **popen_kws) 

    return process_

# Commands
cmds = list(map(lambda x:"echo '{}' && sleep 0.5".format(x), range(1, N_JOBS+1)))
# ["echo '1'", "echo '2'", "echo '3'", "echo '4'"]

# Start time 
start_time = time.time()
results = dict()
for thread, cmd in enumerate(cmds, start=1):
    # Run command but don't wait for it to finish (Currently, it's waiting to finish)
    p = multiprocessing.Process(target=run, args=(cmd,))
    p.start()
    p.join()
    results[thread] = p

【问题讨论】:

    标签: python multithreading parallel-processing subprocess popen


    【解决方案1】:

    你快到了。使用多处理的最简单方法是使用multiprocessing.Pool 对象,如multiprocessing documentation 的介绍中所示,然后使用map()starmap() 您的一组函数。 map()starmap() 之间的最大区别在于 map() 假定您的函数采用单个参数(因此您可以传递一个简单的迭代器),而 starmap() 需要嵌套的迭代器参数。

    对于您的示例,这将起作用(run() 函数在很大程度上被跳过,尽管我将签名更改为命令和参数列表,因为将字符串传递给系统调用通常是个坏主意):

    from multiprocessing import Pool
    
    N_JOBS = 4
    
    def run(cmd, *args):
        return cmd + str(args)
    
    cmds = [
        ('echo', 'hello', 1, 3, 4),
        ('ls', '-l', '-r'),
        ('sleep', 3),
        ('pwd', '-P'),
        ('whoami',),
    ]
    
    results = []
    with Pool(N_JOBS) as p:
        results = p.starmap(run, cmds)
    
    for r in results:
        print(r)
    

    作业数量不必与命令相同; Pool 中的子进程将根据需要被重用以运行函数。

    【讨论】:

    • 使用starmap 我得到以下错误:# TypeError: run() takes from 1 to 3 positional arguments but 21 were givenmap 我得到MaybeEncodingError: Error sending result: '[{'process': &lt;subprocess.Popen object at 0x132efe9e8&gt;, 'stdout': b'3\n', 'stderr': b'', 'returncode': 0}]'. Reason: 'TypeError("can't pickle _thread.lock objects",)'
    • 您正在使用原始的 run 函数,该函数需要 1 个必需参数。使用map,您永远无法传递 2 个参数,因此无法传递 popen_kwds,但它会起作用。 starmap 需要嵌套列表(例如 [['ls -l',{'shell':True}], ['pwd'],...])。 return 只能传递 pickle-able 对象,因此 Popen 对象不能作为 return。只需返回标准输出、标准输入和返回码值;他们会腌制得很好。
    猜你喜欢
    • 2012-04-09
    • 2021-06-21
    • 2021-10-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-11-18
    • 2011-05-10
    相关资源
    最近更新 更多