【问题标题】:Reading output from child process using python使用python从子进程读取输出
【发布时间】:2014-02-10 10:23:05
【问题描述】:

背景

我正在使用subprocess 模块从python 启动一个进程。我希望能够在写入/缓冲后立即访问输出(stdout、stderr)。

  • 解决方案必须支持 Windows 7。我也需要 Unix 系统的解决方案,但我怀疑 Windows 的情况更难解决。
  • 该解决方案应支持 Python 2.6。我目前仅限于 Python 2.6,但仍然赞赏使用更高版本 Python 的解决方案。
  • 该解决方案不应使用第三方库。理想情况下,我会喜欢使用标准库的解决方案,但我愿意接受建议。
  • 该解决方案必须适用于几乎所有流程。假设无法控制正在执行的进程。

子进程

例如,假设我想通过subprocess 运行一个名为counter.py 的python 文件。 counter.py的内容如下:

import sys

for index in range(10):

    # Write data to standard out.
    sys.stdout.write(str(index))

    # Push buffered data to disk.
    sys.stdout.flush()

父进程

负责执行counter.py例子的父进程如下:

import subprocess

command = ['python', 'counter.py']

process = subprocess.Popen(
    cmd,
    bufsize=1,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    ) 

问题

使用counter.py 示例,我可以在流程完成之前访问数据。这很棒!这正是我想要的。但是,删除 sys.stdout.flush() 调用会阻止数据在我想要的时候被访问。这是不好的!这正是我不想要的。我的理解是flush() 调用强制将数据写入磁盘,并且在数据写入磁盘之前它仅存在于缓冲区中。请记住,我希望能够运行几乎任何进程。我不希望该过程执行这种刷新,但我仍然希望数据实时可用(或接近它)。有没有办法做到这一点?

关于父进程的简要说明。您可能会注意到我使用bufsize=0 进行行缓冲。我希望这会导致每一行都刷新到磁盘,但它似乎不是那样工作的。这个论点如何运作?

您还会注意到我正在使用subprocess.PIPE。这是因为它似乎是在父进程和子进程之间产生 IO 对象的唯一值。我通过查看subprocess 模块中的Popen._get_handles 方法得出了这个结论(这里我指的是Windows 定义)。有两个重要的变量,c2preadc2pwrite,它们是根据传递给 Popen 构造函数的 stdout 值设置的。例如,如果未设置 stdout,则未设置 c2pread 变量。使用文件描述符和类文件对象时也是如此。我真的不知道这是否重要,但我的直觉告诉我,我想要读写 IO 对象来实现我想要实现的目标——这就是我选择subprocess.PIPE 的原因。如果有人能更详细地解释这一点,我将不胜感激。同样,如果有令人信服的理由使用 subprocess.PIPE 以外的其他内容,我会全力以赴。

从子进程中获取数据的方法

import time
import subprocess
import threading
import Queue


class StreamReader(threading.Thread):
    """
    Threaded object used for reading process output stream (stdout, stderr).   
    """

    def __init__(self, stream, queue, *args, **kwargs):
        super(StreamReader, self).__init__(*args, **kwargs)
        self._stream = stream
        self._queue = queue

        # Event used to terminate thread. This way we will have a chance to 
        # tie up loose ends. 
        self._stop = threading.Event()

    def stop(self):
        """
        Stop thread. Call this function to terminate the thread. 
        """
        self._stop.set()

    def stopped(self):
        """
        Check whether the thread has been terminated.
        """
        return self._stop.isSet()

    def run(self):
        while True:
            # Flush buffered data (not sure this actually works?)
            self._stream.flush()

            # Read available data.
            for line in iter(self._stream.readline, b''):
                self._queue.put(line)

            # Breather.
            time.sleep(0.25)

            # Check whether thread has been terminated.
            if self.stopped():
                break


cmd = ['python', 'counter.py']

process = subprocess.Popen(
    cmd,
    bufsize=1,
    stdout=subprocess.PIPE,
    )

stdout_queue = Queue.Queue()
stdout_reader = StreamReader(process.stdout, stdout_queue)
stdout_reader.daemon = True
stdout_reader.start()

# Read standard out of the child process whilst it is active.  
while True:

    # Attempt to read available data.  
    try:
        line = stdout_queue.get(timeout=0.1)
        print '%s' % line

    # If data was not read within time out period. Continue. 
    except Queue.Empty:
        # No data currently available.
        pass

    # Check whether child process is still active.
    if process.poll() != None:

        # Process is no longer active.
        break

# Process is no longer active. Nothing more to read. Stop reader thread.
stdout_reader.stop()

这里我正在执行从线程中的子进程读取标准的逻辑。这允许在数据可用之前读取被阻塞的情况。我们不是等待一些可能很长的时间,而是检查是否有可用数据,在超时期限内读取,如果没有则继续循环。

我还尝试了另一种使用非阻塞读取的方法。这种方法使用ctypes 模块来访问Windows 系统调用。请注意,我并不完全理解我在这里所做的事情——我只是试图理解我在其他帖子中看到的一些示例代码。在任何情况下,以下 sn-p 都不能解决缓冲问题。我的理解是,这只是对抗可能较长的阅读时间的另一种方式。

import os
import subprocess

import ctypes
import ctypes.wintypes
import msvcrt

cmd = ['python', 'counter.py']

process = subprocess.Popen(
    cmd,
    bufsize=1,
    stdout=subprocess.PIPE,
    )


def read_output_non_blocking(stream):
    data = ''
    available_bytes = 0

    c_read = ctypes.c_ulong()
    c_available = ctypes.c_ulong()
    c_message = ctypes.c_ulong()

    fileno = stream.fileno()
    handle = msvcrt.get_osfhandle(fileno)

    # Read available data.
    buffer_ = None
    bytes_ = 0
    status = ctypes.windll.kernel32.PeekNamedPipe(
        handle,
        buffer_,
        bytes_,
        ctypes.byref(c_read),
        ctypes.byref(c_available),
        ctypes.byref(c_message),
        )

    if status:
        available_bytes = int(c_available.value)

    if available_bytes > 0:
        data = os.read(fileno, available_bytes)
        print data

    return data

while True:

    # Read standard out for child process.
    stdout = read_output_non_blocking(process.stdout)
    print stdout

    # Check whether child process is still active.
    if process.poll() != None:

        # Process is no longer active.
        break

非常感谢您的评论。

干杯

【问题讨论】:

  • 我不确定我是否完全理解您的问题,但问题"Python subprocess reading" 可能值得一看。
  • @iljau:谢谢。这是一个类似的问题,EOF 条件可能会在这里发挥作用,但对该问题的回答并不能真正提供解决方案。我认为这更多的是关于如何控制缓冲的问题。我需要一些方法来强制数据更频繁地刷新(或写入磁盘)。或者也许有一个完全不同的解决方案。我在想套接字可能有用吗?我还在调查。另一方面 - 让操作系统做它的事情也许更明智。
  • @iljau:再次感谢您的努力。这个问题有一些有用的回答。但是,selectfcntl 不适用于 Windows 平台(支持 select,但仅使用 socket 对象)。 asyncproctwistedtornado 都是第三方软件包,但我还是应该看看这些,即使只是为了教育目的。 PYTHONUNBUFFERED 环境变量有效,但前提是可执行文件(子进程)是 python 脚本。不错!
  • 现在这是一个长镜头,但文章"Asynchronous I/O in Windows for Unix Programmers"可能会提供一些有用的指导。

标签: python buffer subprocess pipe flush


【解决方案1】:

这里的问题是由 child 进程进行缓冲。您的subprocess 代码已经可以正常工作,但是如果您有一个缓冲其输出的子进程,那么subprocess 管道对此无能为力。

我怎么强调都不为过:你看到的缓冲延迟是子进程的责任,它如何处理缓冲与subprocess 模块无关。

你已经发现了这个;这就是为什么在子进程中添加sys.stdout.flush() 可以更快地显示数据;子进程在将其发送到sys.stdout 管道 1 之前使用缓冲 I/O(用于收集写入数据的内存缓存)。

sys.stdout 连接到终端时,Python 会自动使用行缓冲;每当写入换行符时,缓冲区就会刷新。使用管道时,sys.stdout 不连接到终端,而是使用固定大小的缓冲区。

现在,可以告诉 Python 子进程 以不同的方式处理缓冲;您可以设置环境变量或使用命令行开关来更改它如何使用缓冲 sys.stdout(以及 sys.stderrsys.stdin)。来自Python command line documentation

-u
强制标准输入、标准输出和标准错误完全无缓冲。在重要的系统上,还要将 stdin、stdout 和 stderr 置于二进制模式。

[...]

PYTHONUNBUFFERED
如果将其设置为非空字符串,则等效于指定 -u 选项。

如果您正在处理 Python 进程的子进程,并且遇到了这些子进程的缓冲问题,则需要查看这些进程的文档以查看它们是否可以切换到使用无缓冲 I/O,或者切换到更理想的缓冲策略。

您可以尝试的一件事是使用script -c command 为子进程提供伪终端。然而,这是一个 POSIX 工具,可能在 Windows 上不可用。


1.需要注意的是,在刷新管道时,没有数据“写入磁盘”;所有数据都完全保留在内存中。 I/O 缓冲区只是内存缓存,通过处理更大块的数据来获得 I/O 的最佳性能。只有当你有一个基于磁盘的文件对象时,fileobj.flush() 才会将任何缓冲区推送到操作系统,这通常意味着数据确实写入了磁盘。

【讨论】:

  • 谢谢!你的描述很好,很清楚。不幸的是,我不能假设子进程是 python 进程,因此PYTHONUNBUFFERED 解决方案仅适用于特定情况。您提到当sys.stdout 连接到终端时,Python 会自动使用行缓冲。我尝试了 shell=True 选项,希望它可以通过 shell 运行子进程来强制行缓冲,但这不起作用。你知道这是如何工作的吗?另外,我有没有办法告诉系统以我想要的方式处理缓冲?
  • shell 不是终端。 Python 使用isatty() system call 来确定流是否是 TTY(终端)。同样,Python 的行为方式是特定于应用程序的,其他应用程序可能会再次选择不同的行为方式。您可以使用script command 让子进程认为我们已连接到 TTY。
  • @MartijnPieters 我爱你! Python 是在我的应用程序中缓冲其输出的唯一进程之一,环境变量解决方案对我来说效果很好。我使用 process.StartInfo.EnvironmentVariables.Add("PYTHONUNBUFFERED", "TRUE"); 来解决我的问题,以解决其他遇到问题的人。我希望我能多次支持您的回复,谢谢!
  • 你让我很开心。我正在拔头发,而 -u 标志正是我所需要的。
【解决方案2】:

期望有一个名为“unbuffer”的命令:

http://expect.sourceforge.net/example/unbuffer.man.html

这将禁用任何命令的缓冲

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-04-06
    • 2019-02-21
    • 2015-01-20
    • 2017-01-12
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多