【问题标题】:How to run Python subprocess and stream but also filter stdout and stderr?如何运行 Python 子进程和流,同时过滤标准输出和标准错误?
【发布时间】:2016-03-29 07:27:28
【问题描述】:

我有一个类似服务器的应用程序,我想从 Python 运行。它永远不会停止,直到用户中断它。我想在应用程序运行时不断地将 stdout 和 stderr 重定向到父级。幸运的是,这正是 subprocess.run 所做的。

壳牌:

$ my-app
1
2
3
...

wrapper.py:

import subprocess
subprocess.run(['my-app'])

执行wrapper.py:

$ python wrapper.py
1
2
3
...

我相信这要归功于subprocess.run 从父进程继承了 stdout 和 stderr 文件描述。很好。

但现在我需要在应用输出特定行时做一些事情。想象一下,当输出行将包含 4 时,我想运行任意 Python 代码:

$ python wrapper.py
1
2
3
4   <-- here I want to do something
...

或者我想从输出中删除一些行:

$ python wrapper.py   <-- allowed only odd numbers
1
3
...

我想我可以有一个过滤功能,我会以某种方式将它挂接到subprocess.run,它会在输出的每一行被调用,不管它是标准输出还是标准错误:

def filter_fn(line):
    if line ...:
        return line.replace(...
    ...

但是如何实现呢?如何将此类或类似的功能挂接到subprocess.run 调用中?


注意:我不能使用 sh 库,因为它对 Windows 的支持为零。

【问题讨论】:

标签: python python-3.x subprocess


【解决方案1】:

如果您希望能够为子进程处理 stdout 或 stderr,只需将 subprocess.PIPE 传递给参数 stdout(分别为 stderr)。然后,您可以以proc.stdout 的身份访问子进程的输出流,默认情况下以字节流的形式访问,但您可以使用universal_newlines = True 将其作为字符串获取。示例:

import subprocess
app = subprocess.Popen(['my-app'], stdout = subprocess.PIPE, universal_newlines = True)
for line in app.stdout:
    if line.strip() == '4':
        # special processing
    else:
        sys.stdout.write(line)

你必须注意的是,为了能够在子进程写入后立即处理输出,子进程必须在每一行之后刷新输出。默认情况下,stdout 在定向到终端时是行缓冲的 - 每行都打印在换行符上 - 但在定向到文件或管道时是 size buffered ,这意味着它仅每 8k 或 16k 刷新一次字符。

在这种情况下,无论你对调用者大小做什么,只有在程序完成时才会得到标准输出。

【讨论】:

  • @HonzaJavorek:一模一样,加参数stderr = subprocess.PIPE和处理app.stderr
  • 您也可以调用subprocess.run(['my-app'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 将它们发送到同一个句柄,这意味着当您循环访问line in app.stdout 时,您将按照终端显示的顺序获得所有输出。请注意,这意味着您将无法以不同的方式处理 STDOUT 和 STERR。
  • 我可以同时分开处理吗?
  • @HonzaJavorek,对不起,我不明白这个问题。你想如何分别和同时处理它们?
  • 恐怕我不能给你一个好的答案。我完全不确定 subprocess 在 Windows 上的工作方式,但是由于 subprocess 在 Linux 上的工作方式,一旦你启动了 subprocess,你要么将它们发送到同一个管道,此时你无法分辨哪个是哪个,或者您将管道分开但丢失了有关哪个文本首先出现的信息。如果 Windows 明显不同,我会感到惊讶。
【解决方案2】:

我相信这段代码可以做到。前面的答案没有解决同时从两个流中读取需要异步的问题。否则,其他答案可能适用于过滤标准输出,然后在标准输出之后执行标准错误。

这是 python 3.8,它为 asyncio 提供了更多描述性的方法名称。

2021 年 8 月 25 日更新:使用 asyncio.run 和 asyncio.gather 作为更高级别,更容易理解函数,而不是直接操作 asyncio 循环。

import sys
import asyncio


async def output_filter(input_stream, output_stream):
    while not input_stream.at_eof():
        output = await input_stream.readline()
        if not output.startswith(b"filtered"):
            output_stream.buffer.write(output)
            output_stream.flush()


async def run_command(command):
    process = await asyncio.create_subprocess_exec(
        *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
    )

    await asyncio.gather(
        output_filter(process.stderr, sys.stderr),
        output_filter(process.stdout, sys.stdout),
    )
    await process.wait()


def main():
    asyncio.run(run_command(["python", "sample_process.py"]))


if __name__ == "__main__":
    main()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2013-11-04
    • 2011-09-01
    • 1970-01-01
    • 2021-03-03
    • 2011-06-26
    • 2011-05-19
    • 2011-02-23
    相关资源
    最近更新 更多