【问题标题】:Checking to see if there is more data to read from a file descriptor using Python's select module使用 Python 的 select 模块检查是否有更多数据要从文件描述符中读取
【发布时间】:2014-05-15 11:56:51
【问题描述】:

我有一个在线程中创建子进程的程序,以便线程可以不断检查特定的输出条件(来自 stdout 或 stderr),并调用适当的回调,而程序的其余部分继续。这是该代码的精简版:

import select
import subprocess
import threading

def run_task():
    command = ['python', 'a-script-that-outputs-lines.py']
    proc = subprocess.Popen(command, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
    while True:

        ready, _, _ = select.select((proc.stdout, proc.stderr), (), (), .1)

        if proc.stdout in ready:
            next_line_to_process = proc.stdout.readline()
            # process the output

        if proc.stderr in ready:
            next_line_to_process = proc.stderr.readline()
            # process the output

        if not ready and proc.poll() is not None:
            break

thread = threading.Thread(target = run_task)
thread.run()

它工作得相当好,但我希望线程在满足两个条件后退出:正在运行的子进程已完成,并且 stdout 和 stderr 中的所有数据都已处理。

我遇到的困难是,如果我的最后一个条件与上面一样(if not ready and proc.poll() is not None),那么线程永远不会退出,因为一旦 stdout 和 stderr 的文件描述符被标记为就绪,它们就永远不会变得未就绪(即使毕竟的数据已从中读取,read() 将挂起或readline() 将返回一个空字符串。

如果我将该条件更改为仅if proc.poll() is not None,则程序退出时循环存在,我不能保证它看到了所有需要处理的数据。

这只是错误的方法,还是有办法可靠地确定您何时读取了将写入文件描述符的所有数据?或者这是尝试从子进程的 stderr/stdout 读取时特有的问题?

我一直在 Python 2.5(在 OS X 上运行)上尝试这个,并且还在 Python 2.6 上尝试过基于 select.poll()select.epoll() 的变体(在具有 2.6 内核的 D​​ebian 上运行)。

【问题讨论】:

  • 我目前正在运行的解决方案是测试next_line_to_process是否为空(在调用readline()之后),如果它为空并且proc.poll()的输出为None,那么我从就绪列表中删除文件描述符并继续下一个(或查看是否该退出)。但我很想知道是否有其他解决方案。

标签: python linux select subprocess


【解决方案1】:

select 模块适用于如果你想知道你是否可以从管道中读取而不阻塞。

为确保您已读取所有数据,请使用更简单的条件 if proc.poll() is not None: break 并在循环后调用 rest = [pipe.read() for pipe in [p.stdout, p.stderr]]

子进程不太可能在关闭之前关闭其 stdout/stderr,因此为简单起见,您可以跳过处理 EOF 的逻辑。


不要直接调用Thread.run(),而是使用Thread.start()。您可能根本不需要这里的单独线程。

不要在select() 之后调用p.stdout.readline(),它可能会阻塞,请改用os.read(p.stdout.fileno(), limit)。空字节串表示对应管道的EOF。


作为替代或补充,您可以使用fcntl 模块使管道不阻塞:

import os
from fcntl import fcntl, F_GETFL, F_SETFL

def make_nonblocking(fd):
    return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | os.O_NONBLOCK)

并在读取时处理 io/os 错误。

【讨论】:

  • 谢谢,这真的很有帮助。我不知道fcntl 模块。
  • 子进程确实会在关闭前关闭其文件描述符。
  • @sivann 进程在执行过程中关闭其标准输出的可能性很小(即,有可能,但我不知道任何现实世界的例子)。
【解决方案2】:

如上所述,我的最终解决方案如下,以防这对任何人都有帮助。我认为这是正确的方法,因为我现在有 97.2% 的把握你不能只使用 select()/poll()read()

import select
import subprocess
import threading

def run_task():
    command = ['python', 'a-script-that-outputs-lines.py']
    proc = subprocess.Popen(command, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
    while True:

        ready, _, _ = select.select((proc.stdout, proc.stderr), (), (), .1)

        if proc.stdout in ready:
            next_line_to_process = proc.stdout.readline()
            if next_line_to_process:
                # process the output
            elif proc.returncode is not None:
                # The program has exited, and we have read everything written to stdout
                ready = filter(lambda x: x is not proc.stdout, ready)

        if proc.stderr in ready:
            next_line_to_process = proc.stderr.readline()
            if next_line_to_process:
                # process the output
            elif proc.returncode is not None:
                # The program has exited, and we have read everything written to stderr
                ready = filter(lambda x: x is not proc.stderr, ready)

        if proc.poll() is not None and not ready:
            break

thread = threading.Thread(target = run_task)
thread.run()

【讨论】:

    【解决方案3】:

    你可以在管道的文件描述符上做一个原始的os.read(fd, size),而不是使用readline()。这是一个非阻塞操作,也可以检测 EOF(在这种情况下,它返回一个空字符串或字节对象)。您必须自己实现行拆分和缓冲。使用这样的东西:

    class NonblockingReader():
      def __init__(self, pipe):
        self.fd = pipe.fileno()
        self.buffer = ""
    
      def readlines(self):
        data = os.read(self.fd, 2048)
        if not data:
          return None
    
        self.buffer += data
        if os.linesep in self.buffer:
          lines = self.buffer.split(os.linesep)
          self.buffer = lines[-1]
          return lines[:-1]
        else:
          return []
    

    【讨论】:

    • 在这种情况下,阻塞/非阻塞行为并不是什么大问题,因为我没有两个线程从同一个文件描述符中读取。但是,os.read() 在到达文件末尾时返回一个空字符串,而不是 None:docs.python.org/2/library/os.html#os.read
    • 没错,2.x中是空字符串。但我认为在较新的 3.x 版本中它应该是 None 。
    • 我不这么认为。在 Python 3 中,它似乎是一个空的 bytes 对象:docs.python.org/3.4/library/os.html#os.read
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-03-29
    • 1970-01-01
    • 2010-10-27
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多