【问题标题】:Determining time since process's last output - with subprocess.Popen确定自进程上次输出以来的时间 - 使用 subprocess.Popen
【发布时间】:2016-10-15 19:48:02
【问题描述】:

我正在为测试套件中的进程编写某种看门狗。我需要确定测试是否挂起。

我可以简单地使用subprocess.Popen(...) 启动该过程,然后使用Popen.wait(timeout=to)Popen.poll() 并保留我自己的计时器。然而,测试在执行时间上差别很大,这使得不可能有一个对所有测试都合理的“超时”值。

我发现确定测试是否挂起的一个好方法是在进程最后一次输出任何内容时设置一个“超时”。为此,我考虑使用

process = subprocess.Popen(args='<program>', stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ...)

Popen.communicate(),以确定stdout 和/或stderr 何时不是None。问题是Popen.communicate(),没有“超时”只会等到进程终止,而“超时”会引发TimeoutExpired 异常,我无法确定是否读取了任何内容。 TimeoutExpired.output 是空的,顺便说一句。

我在文档中找不到任何允许手动执行“读取”的内容。此外,该进程通常有很多输出,因此以stdout=&lt;open_file_descriptor&gt; 开头将是有益的,因为我不会担心溢出管道缓冲区。

更新/解决方案:

Popen.stdoutPopen.stderr 返回一个“可读流对象”,可用于手动轮询/选择和读取。我最终使用了 select 'Polling Objects',它使用了 poll() 系统调用,如下所示:

import os
import select
import subprocess

p = subprocess.Popen(args="<program>", shell=True, universal_newlines=True,
                     stdout=subprocess.PIPE, stderr=subprocess.PIPE)
poll_obj = select.poll()
poll_obj.register(p.stdout, select.POLLIN)
poll_obj.register(p.stderr, select.POLLIN)

while p.poll() is None:
    events = True
    while events:
        events = poll_obj.poll(10)
        for fd, event in events:
            if event & select.POLLIN:
                print("STDOUT: " if fd == p.stdout.fileno() else "STDERR: ")
                print(os.read(fd, 1024).decode())
            # else some other error (see 'Polling Objects')

【问题讨论】:

    标签: python linux subprocess


    【解决方案1】:

    这有点像here..

    基本上您需要使用select() 来轮询 fd 以查看他们是否有输入:

    #!/usr/bin/python
    
    import fcntl import os import select import subprocess
    
    
    def setnonblocking(fd):
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return fd
    
    p = subprocess.Popen("/bin/sh -c 'c=10; while [ $c -gt 0 ]; do echo $c hello; sleep 1; >&2 echo world; sleep 1; let c=$c-1; done'", stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True)
    
    process_fds = map(setnonblocking, [p.stdout, p.stderr])
    
    while process_fds:
        readable, writable, exceptional = select.select(process_fds, [], process_fds, 100)
        print "Select: ", readable, writable, exceptional
        print "Exitcode: ", p.poll()
        for fd in readable:
            data = os.read(fd.fileno(), 1024)
            if data == "":  # EOF
                process_fds.remove(fd)
                continue
            if fd == p.stdout:
                print "STDOUT: ",
            if fd == p.stderr:
                print "STDERR: ",
            print data,
        for fd in exceptional:
            process_fds.remove(fd)
    

    输出:

    Select:  [<open file '<fdopen>', mode 'rb' at 0x7fed75daa6f0>] [] []
    Exitcode:  None
    STDOUT:  10 hello
    Select:  [<open file '<fdopen>', mode 'rb' at 0x7fed75daa660>] [] []
    Exitcode:  None
    STDERR:  world
    Select:  [<open file '<fdopen>', mode 'rb' at 0x7fed75daa6f0>] [] []
    Exitcode:  None
    STDOUT:  9 hello
    Select:  [<open file '<fdopen>', mode 'rb' at 0x7fed75daa660>] [] []
    Exitcode:  None
    [...]
    STDOUT:  1 hello
    Select:  [<open file '<fdopen>', mode 'rb' at 0x7fed75daa660>] [] []
    Exitcode:  None
    STDERR:  world
    Select:  [<open file '<fdopen>', mode 'rb' at 0x7fed75daa6f0>, <open file '<fdopen>', mode 'rb' at 0x7fed75daa660>] [] []
    Exitcode:  1
    

    使用os.read() 代替fd.read(),因为您需要以非面向行的方式阅读。 fd.read() 等到找到换行符——但你可能会阻塞。使用此方法,您还可以拆分 stderrstdout

    编辑:修改为处理在EOFp.stdoutp.stderr 之前退出的进程

    【讨论】:

    • @J.F.Sebastian 感谢您的关注!我已经修改只是为了循环直到两个 fd 都是 EOF。
    • 在 Python 中使用 while some_list: 而不是 while len(some_list):。我不确定将 select() 与阻塞 fds 一起使用是否安全。
    • @J.F.Sebastian 风格明智,我通常更喜欢 len(list) 的明确性。来自select() 系统调用手册页:如果可以在不阻塞的情况下执行相应的 I/O 操作(例如 read(2)),则认为文件描述符已准备就绪。 这意味着 fd否则可能会阻塞。
    • 使用 while len(items): 代替 while items: 不是惯用的。你确定select() 不能早点返回,例如,在信号上?此外,您可能应该在超时时退出循环。
    • 似乎select()是否以及如何被信号中断可能取决于平台和python版本(例如,可能会引发异常,可能会自动重新启动)。我已经检查过:os.read() 可能会在select.select() 之后阻塞(因为其他一些进程可能会消耗管道或(在套接字的情况下)由于网络堆栈实现中的一些怪异:可能会报告数据准备就绪,然后无论出于何种原因被内核丢弃)...
    【解决方案2】:

    以下是如何在 Python 3 中的 Unix 上实现“自子进程的最后一次输出以来的超时”:

    #!/usr/bin/env python3
    import os
    import selectors
    import sys
    from subprocess import Popen, PIPE, _PopenSelector as Selector
    
    timeout = 1  # seconds
    with Popen([sys.executable, '-c', '''import time
    for i in range(10):  # dummy script
        time.sleep(i)
        print(i, flush=True)
    '''], stdout=PIPE, stderr=PIPE) as process:
        pipes = {process.stdout: 1, process.stderr: 2}  # where to echo data
        with Selector() as sel:
            for pipe in pipes:
                os.set_blocking(pipe.fileno(), False)
                sel.register(pipe, selectors.EVENT_READ)
            while pipes:
                events = sel.select(timeout)
                if not events:  # timeout
                    process.kill()
                for key, mask in events:
                    assert mask == selectors.EVENT_READ
                    data = os.read(key.fd, 512)
                    if data == b'':  # EOF
                        sel.unregister(key.fileobj)
                        del pipes[key.fileobj]
                    else:  # echo data
                        os.write(pipes[key.fileobj], data)
    

    注意:循环不会在process.poll() 上终止——没有数据丢失。该代码使用subprocess 作者喜欢的相同选择器,否则可以使用sel = selectors.DefaultSelector()。如果孙子进程可能继承管道,那么您应该更积极地在超时时打破循环(EOF may be delayed)。要在 Python 3.5 之前实现os.set_blocking(),可以使用fcntl

    from fcntl import fcntl, F_GETFL, F_SETFL
    
    def set_nonblocking(fd):
        fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | os.O_NONBLOCK) # set O_NONBLOCK
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-05-21
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多