【问题标题】:Getting latest lines of streaming stdout from a Python subprocess从 Python 子进程获取最新的流标准输出行
【发布时间】:2020-05-29 03:27:57
【问题描述】:

我的目标:每 M 秒从子进程读取最新的“块”(N 行)流式标准输出。

当前代码:

  1. 启动子进程
  2. 读取标准输出
  3. 一旦我有 N 行的块,打印出来(或保存为当前块)
  4. 等待 M 秒
  5. 重复
  6. 我还暂时添加了终止子进程的代码(在您按下 Ctrl-C 之前,这是一个无穷无尽的流)

我想要实现的是在等待 M 秒后,如果它始终读取 latest N 行而不是 stdout 中的后续 N 行(它们可以丢弃,因为我只对最新感兴趣)

我的最终目标是生成一个线程来运行进程并继续保存最新的行,然后在我需要流的最新结果时从主进程调用。

任何帮助将不胜感激!

#!/usr/bin/env python3
import signal
import time
from subprocess import Popen, PIPE

sig = signal.SIGTERM

N=9
M=5

countlines=0
p = Popen(["myprogram"], stdout=PIPE, bufsize=1, universal_newlines=True)

chunk=[]

for line in p.stdout:
    countlines+=1
    chunk.append(line)

    if len(chunk)==N:
        print(chunk)
        chunk=[]
        time.sleep(M)

    if countlines>100:
        p.send_signal(sig)
        break

print("done")

【问题讨论】:

  • 我认为这需要创建一个类似文件的类来接收输出,如果在 M 秒内没有新输入,则丢弃旧行并打印最后一行(将需要一个计时器对象或线程)。

标签: python stream subprocess


【解决方案1】:

经过大量搜索,我偶然发现了一个解决方案:

https://eli.thegreenplace.net/2017/interacting-with-a-long-running-child-process-in-python/

Eli 的“启动、交互、实时获取输出、终止”代码部分对我有用。 到目前为止,它是我发现的最优雅的解决方案。

适应了我上面的问题,并写在一个类中(这里没有显示):

def output_reader(self,proc):
    chunk=[]
    countlines=0
    for line in iter(proc.stdout.readline, b''):
        countlines+=1
        chunk.append(line.decode("utf-8"))
        if countlines==N:
            self.current_chunk = chunk
            chunk=[]
            countlines=0

def main():
    proc = subprocess.Popen(['myprocess'],
                            stdout=subprocess.PIPE,
                            stderr=subprocess.STDOUT)

    t = threading.Thread(target=output_reader, args=(proc,))
    t.start()

    try:
        time.sleep(0.2)
        for i in range(10):
            time.sleep(1) # waits a while before getting latest lines
            print(self.current_chunk)
    finally:
        proc.terminate()
        try:
            proc.wait(timeout=0.2)
            print('== subprocess exited with rc =', proc.returncode)
        except subprocess.TimeoutExpired:
            print('subprocess did not terminate in time')
    t.join()

【讨论】:

    【解决方案2】:

    这是另一种可能的解决方案。它是一个程序,您可以在管道中作为单独的进程运行,它提供一个 REST API,当被查询时将返回它在标准输入上读取的最后 N 行(其中 N 和端口号在标准输入上提供)。它在烧瓶中使用run,因此不应该在外部世界可以访问本地服务器端口来发出请求的情况下使用,尽管这可以调整。

    import sys
    import time
    import threading
    import argparse
    from flask import Flask, request
    from flask_restful import Resource, Api
    
    
    class Server:
    
        def __init__(self):
            self.data = {'at_eof': False,
                         'lines_read': 0,
                         'latest_lines': []}
            self.thread = None
            self.args = None
            self.stop = False
    
    
        def parse_args(self):
            parser = argparse.ArgumentParser()
            parser.add_argument("num_lines", type=int,
                                help="number of lines to cache")
            parser.add_argument("port", type=int,
                                help="port to serve on")
            self.args = parser.parse_args()
    
    
        def start_updater(self):
            def updater():
                lines = self.data['latest_lines']
                while True:
                    if self.stop:
                        return
                    line = sys.stdin.readline()
                    if not line:
                        break
                    self.data['lines_read'] += 1
                    lines.append(line)
                    while len(lines) > self.args.num_lines:
                        lines.pop(0)
                self.data['at_eof'] = True
            self.thread = threading.Thread(target=updater)
            self.thread.start()
    
    
        def get_data(self):
            return self.data
    
    
        def shutdown(self):
            self.stop = True
            func = request.environ.get('werkzeug.server.shutdown')
            if func:
                func()
                return 'Shutting down'
            else:
                return 'shutdown failed'
    
    
        def add_apis(self, app):
    
            class GetData(Resource):
                get = self.get_data
    
            class Shutdown(Resource):
                get = self.shutdown            
    
            api = Api(app)
            api.add_resource(GetData, "/getdata")
            api.add_resource(Shutdown, "/shutdown")
    
    
        def run(self):
            self.parse_args()
            self.start_updater()        
            app = Flask(__name__)
            self.add_apis(app)
            app.run(port=self.args.port)
    
    
    server = Server()
    server.run()
    

    示例用法:这是一个我们要为其输出提供服务的测试程序:

    import sys
    import time
    
    for i in range(100):
        print("this is line {}".format(i))
        sys.stdout.flush()
        time.sleep(.1)
    

    还有一个启动它的简单管道(此处来自 linux shell 提示符,但可以通过 subprocess.Popen 完成),在端口 8001 上提供最后 5 行:

    python ./writer.py  | python ./server.py 5 8001
    

    一个示例查询,这里使用 curl 作为客户端,但可以通过 Python requests 完成:

    $ curl -s http://localhost:8001/getdata
    {"at_eof": false, "lines_read": 30, "latest_lines": ["this is line 25\n", "this is line 26\n", "this is line 27\n", "this is line 28\n", "this is line 29\n"]}
    

    服务器还提供了一个http://localhost:<port>/shutdown URL 来终止它,但如果您在第一次看到"at_eof": true 之前调用它,那么预计作者会因管道损坏而死。

    【讨论】:

      猜你喜欢
      • 2017-03-28
      • 1970-01-01
      • 1970-01-01
      • 2016-11-17
      • 2011-11-28
      • 2020-03-03
      • 1970-01-01
      • 2016-03-26
      • 1970-01-01
      相关资源
      最近更新 更多