【问题标题】:Using asyncio to wait for results from subprocess使用 asyncio 等待子进程的结果
【发布时间】:2020-09-07 18:56:13
【问题描述】:

我的 Python 脚本包含一个循环,该循环使用 subprocess 在脚本之外运行命令。每个子进程都是独立的。如果出现错误,我会监听返回的消息;我不能忽略子流程的结果。这是没有 asyncio 的脚本(我已经用 sleep 替换了计算量很大的调用):

from subprocess import PIPE  # https://docs.python.org/3/library/subprocess.html
import subprocess

def go_do_something(index: int) -> None:
    """
    This function takes a long time
    Nothing is returned
    Each instance is independent
    """
    process = subprocess.run(["sleep","2"],stdout=PIPE,stderr=PIPE,timeout=20)
    stdout = process.stdout.decode("utf-8")
    stderr = process.stderr.decode("utf-8")
    if "error" in stderr:
        print("error for "+str(index))
    return

def my_long_func(val: int) -> None:
    """
    This function contains a loop
    Each iteration of the loop calls a function
    Nothing is returned
    """
    for index in range(val):
        print("index = "+str(index))
        go_do_something(index)

# run the script
my_long_func(3) # launch three tasks

我想我可以使用asyncio 来加快此活动,因为 Python 脚本正在等待外部 subprocess 完成。我认为threadingmultiprocessing 不是必需的,尽管它们也可以加快执行速度。使用任务队列(例如 Celery)是另一种选择。

我尝试实现 asyncio 方法,但由于以下尝试不会改变整体执行时间,因此遗漏了一些东西:

import asyncio
from subprocess import PIPE  # https://docs.python.org/3/library/subprocess.html
import subprocess


async def go_do_something(index: int) -> None:
    """
    This function takes a long time
    Nothing is returned
    Each instance is independent
    """
    process = subprocess.run(["sleep","2"],stdout=PIPE,stderr=PIPE,timeout=20)
    stdout = process.stdout.decode("utf-8")
    stderr = process.stderr.decode("utf-8")
    if "error" in stderr:
        print("error for "+str(index))
    return

def my_long_func(val: int) -> None:
    """
    This function contains a loop
    Each iteration of the loop calls a function
    Nothing is returned
    """
    # https://docs.python.org/3/library/asyncio-eventloop.html
    loop = asyncio.get_event_loop()
    tasks = []
    for index in range(val):
        task = go_do_something(index)
        tasks.append(task)
    # https://docs.python.org/3/library/asyncio-task.html
    tasks = asyncio.gather(*tasks)
    loop.run_until_complete(tasks)
    loop.close()
    return

my_long_func(3) # launch three tasks

如果我想监控每个subprocess 的输出,但不等待每个subprocess 运行,我可以从asyncio 中受益吗?还是这种情况需要multiprocessing 或 Celery 之类的东西?

【问题讨论】:

    标签: python-3.x concurrency subprocess python-asyncio


    【解决方案1】:

    尝试使用asyncio 而不是subprocess 执行命令。

    定义一个run()函数:

    import asyncio
    
    async def run(cmd: str):
        proc = await asyncio.create_subprocess_shell(
            cmd,
            stderr=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE
        )
    
        stdout, stderr = await proc.communicate()
    
        print(f'[{cmd!r} exited with {proc.returncode}]')
        if stdout:
            print(f'[stdout]\n{stdout.decode()}')
        if stderr:
            print(f'[stderr]\n{stderr.decode()}')
    

    然后你可以像调用任何async 函数一样调用它:

    asyncio.run(run('sleep 2'))
    
    #=>
    
    ['sleep 2' exited with 0]
    

    示例取自官方documentation。也可以here.

    【讨论】:

    • 感谢您向我指出该文档;我不知道这种整合。当我尝试在我的案例中采用这个例子时,我失败了。具体来说,我尝试将async def run(cmd) 添加到上面的第一个示例中,将await run("sleep 2") 放在go_do_something(index) 中。这会导致一条错误消息指出 await run("sleep 2")SyntaxError: invalid syntax
    • 这是我根据您的建议尝试的方法 -- pastebin.com/AU9YgbGG
    【解决方案2】:

    @ronginat 将我指向https://asyncio.readthedocs.io/en/latest/subprocess.html,我能够适应我正在寻找的情况:

    import asyncio
    
    async def run_command(*args):
        # Create subprocess
        process = await asyncio.create_subprocess_exec(
            *args,
            # stdout must a pipe to be accessible as process.stdout
            stdout=asyncio.subprocess.PIPE)
        # Wait for the subprocess to finish
        stdout, stderr = await process.communicate()
        # Return stdout
        return stdout.decode().strip()
    
    async def go_do_something(index: int) -> None:
        print('index=',index)
        res = await run_command('sleep','2')
        return res
    
    def my_long_func(val: int) -> None:
        task_list = []
        for indx in range(val):
            task_list.append( go_do_something(indx) )
        loop = asyncio.get_event_loop()
        commands = asyncio.gather(*task_list)
        reslt = loop.run_until_complete(commands)
        print(reslt)
        loop.close()
    
    my_long_func(3) # launch three tasks
    

    即使有三个持续时间为 2 秒的睡眠,总执行时间也刚刚超过 2 秒。我从每个子进程中获取标准输出。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-12-03
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多