【发布时间】:2014-02-10 10:23:05
【问题描述】:
背景
我正在使用subprocess 模块从python 启动一个进程。我希望能够在写入/缓冲后立即访问输出(stdout、stderr)。
- 解决方案必须支持 Windows 7。我也需要 Unix 系统的解决方案,但我怀疑 Windows 的情况更难解决。
- 该解决方案应支持 Python 2.6。我目前仅限于 Python 2.6,但仍然赞赏使用更高版本 Python 的解决方案。
- 该解决方案不应使用第三方库。理想情况下,我会喜欢使用标准库的解决方案,但我愿意接受建议。
- 该解决方案必须适用于几乎所有流程。假设无法控制正在执行的进程。
子进程
例如,假设我想通过subprocess 运行一个名为counter.py 的python 文件。 counter.py的内容如下:
import sys
for index in range(10):
# Write data to standard out.
sys.stdout.write(str(index))
# Push buffered data to disk.
sys.stdout.flush()
父进程
负责执行counter.py例子的父进程如下:
import subprocess
command = ['python', 'counter.py']
process = subprocess.Popen(
cmd,
bufsize=1,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
问题
使用counter.py 示例,我可以在流程完成之前访问数据。这很棒!这正是我想要的。但是,删除 sys.stdout.flush() 调用会阻止数据在我想要的时候被访问。这是不好的!这正是我不想要的。我的理解是flush() 调用强制将数据写入磁盘,并且在数据写入磁盘之前它仅存在于缓冲区中。请记住,我希望能够运行几乎任何进程。我不希望该过程执行这种刷新,但我仍然希望数据实时可用(或接近它)。有没有办法做到这一点?
关于父进程的简要说明。您可能会注意到我使用bufsize=0 进行行缓冲。我希望这会导致每一行都刷新到磁盘,但它似乎不是那样工作的。这个论点如何运作?
您还会注意到我正在使用subprocess.PIPE。这是因为它似乎是在父进程和子进程之间产生 IO 对象的唯一值。我通过查看subprocess 模块中的Popen._get_handles 方法得出了这个结论(这里我指的是Windows 定义)。有两个重要的变量,c2pread 和 c2pwrite,它们是根据传递给 Popen 构造函数的 stdout 值设置的。例如,如果未设置 stdout,则未设置 c2pread 变量。使用文件描述符和类文件对象时也是如此。我真的不知道这是否重要,但我的直觉告诉我,我想要读写 IO 对象来实现我想要实现的目标——这就是我选择subprocess.PIPE 的原因。如果有人能更详细地解释这一点,我将不胜感激。同样,如果有令人信服的理由使用 subprocess.PIPE 以外的其他内容,我会全力以赴。
从子进程中获取数据的方法
import time
import subprocess
import threading
import Queue
class StreamReader(threading.Thread):
"""
Threaded object used for reading process output stream (stdout, stderr).
"""
def __init__(self, stream, queue, *args, **kwargs):
super(StreamReader, self).__init__(*args, **kwargs)
self._stream = stream
self._queue = queue
# Event used to terminate thread. This way we will have a chance to
# tie up loose ends.
self._stop = threading.Event()
def stop(self):
"""
Stop thread. Call this function to terminate the thread.
"""
self._stop.set()
def stopped(self):
"""
Check whether the thread has been terminated.
"""
return self._stop.isSet()
def run(self):
while True:
# Flush buffered data (not sure this actually works?)
self._stream.flush()
# Read available data.
for line in iter(self._stream.readline, b''):
self._queue.put(line)
# Breather.
time.sleep(0.25)
# Check whether thread has been terminated.
if self.stopped():
break
cmd = ['python', 'counter.py']
process = subprocess.Popen(
cmd,
bufsize=1,
stdout=subprocess.PIPE,
)
stdout_queue = Queue.Queue()
stdout_reader = StreamReader(process.stdout, stdout_queue)
stdout_reader.daemon = True
stdout_reader.start()
# Read standard out of the child process whilst it is active.
while True:
# Attempt to read available data.
try:
line = stdout_queue.get(timeout=0.1)
print '%s' % line
# If data was not read within time out period. Continue.
except Queue.Empty:
# No data currently available.
pass
# Check whether child process is still active.
if process.poll() != None:
# Process is no longer active.
break
# Process is no longer active. Nothing more to read. Stop reader thread.
stdout_reader.stop()
这里我正在执行从线程中的子进程读取标准的逻辑。这允许在数据可用之前读取被阻塞的情况。我们不是等待一些可能很长的时间,而是检查是否有可用数据,在超时期限内读取,如果没有则继续循环。
我还尝试了另一种使用非阻塞读取的方法。这种方法使用ctypes 模块来访问Windows 系统调用。请注意,我并不完全理解我在这里所做的事情——我只是试图理解我在其他帖子中看到的一些示例代码。在任何情况下,以下 sn-p 都不能解决缓冲问题。我的理解是,这只是对抗可能较长的阅读时间的另一种方式。
import os
import subprocess
import ctypes
import ctypes.wintypes
import msvcrt
cmd = ['python', 'counter.py']
process = subprocess.Popen(
cmd,
bufsize=1,
stdout=subprocess.PIPE,
)
def read_output_non_blocking(stream):
data = ''
available_bytes = 0
c_read = ctypes.c_ulong()
c_available = ctypes.c_ulong()
c_message = ctypes.c_ulong()
fileno = stream.fileno()
handle = msvcrt.get_osfhandle(fileno)
# Read available data.
buffer_ = None
bytes_ = 0
status = ctypes.windll.kernel32.PeekNamedPipe(
handle,
buffer_,
bytes_,
ctypes.byref(c_read),
ctypes.byref(c_available),
ctypes.byref(c_message),
)
if status:
available_bytes = int(c_available.value)
if available_bytes > 0:
data = os.read(fileno, available_bytes)
print data
return data
while True:
# Read standard out for child process.
stdout = read_output_non_blocking(process.stdout)
print stdout
# Check whether child process is still active.
if process.poll() != None:
# Process is no longer active.
break
非常感谢您的评论。
干杯
【问题讨论】:
-
我不确定我是否完全理解您的问题,但问题"Python subprocess reading" 可能值得一看。
-
@iljau:谢谢。这是一个类似的问题,EOF 条件可能会在这里发挥作用,但对该问题的回答并不能真正提供解决方案。我认为这更多的是关于如何控制缓冲的问题。我需要一些方法来强制数据更频繁地刷新(或写入磁盘)。或者也许有一个完全不同的解决方案。我在想套接字可能有用吗?我还在调查。另一方面 - 让操作系统做它的事情也许更明智。
-
@iljau:再次感谢您的努力。这个问题有一些有用的回答。但是,
select和fcntl不适用于 Windows 平台(支持select,但仅使用socket对象)。asyncproc、twisted和tornado都是第三方软件包,但我还是应该看看这些,即使只是为了教育目的。PYTHONUNBUFFERED环境变量有效,但前提是可执行文件(子进程)是 python 脚本。不错! -
现在这是一个长镜头,但文章"Asynchronous I/O in Windows for Unix Programmers"可能会提供一些有用的指导。
标签: python buffer subprocess pipe flush