【问题标题】:Decorate \ delegate a File object to add functionality装饰\委托一个文件对象来添加功能
【发布时间】:2011-01-17 13:55:12
【问题描述】:

我一直在编写一个小的 Python 脚本,它使用 subprocess 模块和一个辅助函数来执行一些 shell 命令:

import subprocess as sp
def run(command, description):
    """Runs a command in a formatted manner. Returns its return code."""
    start=datetime.datetime.now()
    sys.stderr.write('%-65s' % description)
    s=sp.Popen(command, shell=True, stderr=sp.PIPE, stdout=sp.PIPE)
    out,err=s.communicate()
    end=datetime.datetime.now()
    duration=end-start
    status='Done' if s.returncode==0 else 'Failed'
    print '%s (%d seconds)' % (status, duration.seconds)

以下行读取标准输出和错误:

    s=sp.Popen(command, shell=True, stderr=sp.PIPE, stdout=sp.PIPE)
    out,err=s.communicate()

如您所见,没有使用 stdout 和 stderr。假设我想以格式化的方式将输出和错误消息写入日志文件,例如:

[STDOUT: 2011-01-17 14:53:55] <message>
[STDERR: 2011-01-17 14:53:56] <message>

我的问题是,最 Pythonic 的方法是什么?我想到了三个选项:

  1. 继承文件对象并覆盖write方法。
  2. 使用实现 write 的 Delegate 类。
  3. 以某种方式连接到PIPE 本身。

更新:参考测试脚本

我正在用这个脚本检查结果,保存为test.py

#!/usr/bin/python
import sys

sys.stdout.write('OUT\n')
sys.stdout.flush()
sys.stderr.write('ERR\n')
sys.stderr.flush()

有什么想法吗?

【问题讨论】:

  • 我放弃了。我能够包装(子类化)文件,以便写入添加时间戳和文件标题,但 Popen 在 fds 上工作,我看不到写入发生的位置。看起来它只是os.execvp,而标准输出句柄在某处很神奇。所以我对 Popen 进行了子类化,但它忽略了传递包装文件和 fd:title 映射的事实。

标签: python design-patterns logging subprocess


【解决方案1】:

1 和 2 是合理的解决方案,但重写 write() 是不够的。

问题是 Popen 需要文件句柄才能附加到进程,因此 Python 文件对象不起作用,它们必须是操作系统级别的。要解决这个问题,您必须拥有一个具有操作系统级别文件句柄的 Python 对象。我能想到解决这个问题的唯一方法是使用管道,所以你有一个操作系统级别的文件句柄可以写入。但是随后您需要另一个线程来轮询该管道以获取要读入的内容,以便它可以记录它。 (所以这更严格地说是 2 的实现,因为它委托给日志记录)。

说到做到:

import io
import logging
import os
import select
import subprocess
import time
import threading

LOG_FILENAME = 'output.log'
logging.basicConfig(filename=LOG_FILENAME,level=logging.DEBUG)

class StreamLogger(io.IOBase):
    def __init__(self, level):
        self.level = level
        self.pipe = os.pipe()
        self.thread = threading.Thread(target=self._flusher)
        self.thread.start()

    def _flusher(self):
        self._run = True
        buf = b''
        while self._run:
            for fh in select.select([self.pipe[0]], [], [], 0)[0]:
                buf += os.read(fh, 1024)
                while b'\n' in buf:
                    data, buf = buf.split(b'\n', 1)
                    self.write(data.decode())
            time.sleep(1)
        self._run = None

    def write(self, data):
        return logging.log(self.level, data)

    def fileno(self):
        return self.pipe[1]

    def close(self):
        if self._run:
            self._run = False
            while self._run is not None:
                time.sleep(1)
            os.close(self.pipe[0])
            os.close(self.pipe[1])

因此该类启动了一个操作系统级别的管道,Popen 可以将标准输入/输出/错误附加到子进程。它还启动一个线程,每秒轮询该管道的另一端以记录要记录的内容,然后将其记录到日志模块中。

为了完整起见,这个类可能应该实现更多的东西,但无论如何它都适用于这种情况。

示例代码:

with StreamLogger(logging.INFO) as out:
    with StreamLogger(logging.ERROR) as err:
        subprocess.Popen("ls", stdout=out, stderr=err, shell=True)

output.log 最终是这样的:

INFO:root:output.log
INFO:root:streamlogger.py
INFO:root:and
INFO:root:so
INFO:root:on

使用 Python 2.6、2.7 和 3.1 测试。

我认为 1 和 3 的任何实现都需要使用类似的技术。这有点牵强,但除非你能正确地自己制作 Popen 命令日志,否则我没有更好的主意)。

【讨论】:

  • +1:这与我的建议类似,但我无法让日志记录线程正常运行。
  • 这可以在 Windows 上运行吗?!我有疑问,因为 win 上的select 仅适用于套接字,而不适用于文件。
  • 我还没有在 Windows 上尝试过,但如果没有,则将 select 替换为您所做的任何操作,以检查 Windows 下的管道上是否有要读取的内容。
  • 这很好,但是在运行一个简单的 python 脚本时它会失败(output.txt 为空),该脚本将一些内容打印到 stdout 和一些内容到 stderr。有什么想法吗?
  • @Adam Matan:不,正如你所见,它与 'ls' 一起工作,所以原则上是合理的。我不知道您的简单 python 脚本是如何工作的,所以我无法解释为什么会失败,尽管我现在确实看到了一个错误:退出时它不会打印缓冲区的其余部分,所以可能就是这样。在 flusher 方法中最后添加一个self.write(buf.decode())。也许也将 sleep(1) 放在循环中。
【解决方案2】:

我建议使用 logging 标准库包的选项 3。在这种情况下,我会说其他 2 个是矫枉过正。

【讨论】:

  • 如何重定向PIPE
【解决方案3】:

1 和 2 不起作用。下面是原理的实现:

import subprocess
import time

FileClass = open('tmptmp123123123.tmp', 'w').__class__

class WrappedFile(FileClass):
    TIMETPL = "%Y-%m-%d %H:%M:%S"
    TEMPLATE = "[%s: %s] "

    def __init__(self, name, mode='r', buffering=None, title=None):
        self.title = title or name

        if buffering is None:
            super(WrappedFile, self).__init__(name, mode)
        else:
            super(WrappedFile, self).__init__(name, mode, buffering)

    def write(self, s):
        stamp = time.strftime(self.TIMETPL)
        if not s:
            return 
        # Add a line with timestamp per line to be written
        s = s.split('\n')
        spre = self.TEMPLATE % (self.title, stamp)
        s = "\n".join(["%s %s" % (spre, line) for line in s]) + "\n"
        super(WrappedFile, self).write(s)

它不起作用的原因是 Popen 从不调用 stdout.write。当我们调用它的 write 方法时,一个包装的文件会正常工作,如果传递给 Popen 甚至会被写入,但写入将发生在较低层,跳过 write 方法。

【讨论】:

    【解决方案4】:

    这个简单的解决方案对我有用:

    import sys
    import datetime
    import tempfile
    import subprocess as sp
    def run(command, description):
        """Runs a command in a formatted manner. Returns its return code."""
        with tempfile.SpooledTemporaryFile(8*1024) as so:
            print >> sys.stderr, '%-65s' % description
            start=datetime.datetime.now()
            retcode = sp.call(command, shell=True, stderr=sp.STDOUT, stdout=so)
            end=datetime.datetime.now()
            so.seek(0)
            for line in so.readlines():
                print >> sys.stderr,'logging this:', line.rstrip()
            duration=end-start
            status='Done' if retcode == 0 else 'Failed'
            print >> sys.stderr, '%s (%d seconds)' % (status, duration.seconds)
    
    REF_SCRIPT = r"""#!/usr/bin/python
    import sys
    
    sys.stdout.write('OUT\n')
    sys.stdout.flush()
    sys.stderr.write('ERR\n')
    sys.stderr.flush()
    """
    
    SCRIPT_NAME = 'refscript.py'
    
    if __name__ == '__main__':
        with open(SCRIPT_NAME, 'w') as script:
            script.write(REF_SCRIPT)
        run('python ' + SCRIPT_NAME, 'Reference script')
    

    【讨论】:

    • 它是否适用于混合错误\输出结果?我的参考脚本失败了(见上面的更新)。
    • @Adam Matan 我用run('ls /nonexistent', 'Does not exist') 尝试过检查stderr,它成功了。在对Popen 的调用中,您确实需要确保它是stderr=sp.SDOUT。顺便说一句,我正在更改代码以删除 s.poll() 循环,因为它可能是一个不需要的快速缠绕循环。
    • @Adam Matan。它也适用于您的参考脚本。查看编辑。我正在运行 Ubuntu 10.10 和 Python 2.6.6)。
    • 请注意,此实验不允许在输出中添加时间戳,否则可能没用。
    【解决方案5】:

    这使用Adam Rosenfield's make_async and read_async。虽然我最初的答案使用了select.epoll,因此仅适用于 Linux,但它现在使用了select.select,它应该可以在 Unix 或 Windows 下工作。

    这会将子进程的输出记录到/tmp/test.log,因为它发生:

    import logging
    import subprocess
    import shlex
    import select
    import fcntl
    import os
    import errno
    
    def make_async(fd):
        # https://stackoverflow.com/a/7730201/190597
        '''add the O_NONBLOCK flag to a file descriptor'''
        fcntl.fcntl(fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)
    
    def read_async(fd):
        # https://stackoverflow.com/a/7730201/190597
        '''read some data from a file descriptor, ignoring EAGAIN errors'''
        try:
            return fd.read()
        except IOError, e:
            if e.errno != errno.EAGAIN:
                raise e
            else:
                return ''
    
    def log_process(proc,stdout_logger,stderr_logger):
        loggers = { proc.stdout: stdout_logger, proc.stderr:  stderr_logger }
        def log_fds(fds):
            for fd in fds:
                out = read_async(fd)
                if out.strip():
                    loggers[fd].info(out)
        make_async(proc.stdout)
        make_async(proc.stderr)
        while True:
            # Wait for data to become available 
            rlist, wlist, xlist = select.select([proc.stdout, proc.stderr], [], [])
            log_fds(rlist)
            if proc.poll() is not None:
                # Corner case: check if more output was created
                # between the last call to read_async and now
                log_fds([proc.stdout, proc.stderr])                
                break
    
    if __name__=='__main__':
        formatter = logging.Formatter('[%(name)s: %(asctime)s] %(message)s')
        handler = logging.FileHandler('/tmp/test.log','w')
        handler.setFormatter(formatter)
    
        stdout_logger=logging.getLogger('STDOUT')
        stdout_logger.setLevel(logging.DEBUG)
        stdout_logger.addHandler(handler)
    
        stderr_logger=logging.getLogger('STDERR')
        stderr_logger.setLevel(logging.DEBUG)
        stderr_logger.addHandler(handler)        
    
        proc = subprocess.Popen(shlex.split('ls -laR /tmp'),
                                stdout=subprocess.PIPE,
                                stderr=subprocess.PIPE)
        log_process(proc,stdout_logger,stderr_logger)
    

    【讨论】:

    • 这是一个很好的解决方案,但它会保留 stdout\stderr 直到进程完成,然后将它们写入日志文件。这意味着时间戳表示记录的时间,而不是事件的时间,并且无法跟踪事件的顺序(写入所有 stdout 消息,然后写入所有 stderr 消息)。
    猜你喜欢
    • 2011-07-29
    • 2011-01-30
    • 1970-01-01
    • 2010-11-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-11-24
    • 1970-01-01
    相关资源
    最近更新 更多