这是另一种可能的解决方案。它是一个程序,您可以在管道中作为单独的进程运行,它提供一个 REST API,当被查询时将返回它在标准输入上读取的最后 N 行(其中 N 和端口号在标准输入上提供)。它在烧瓶中使用run,因此不应该在外部世界可以访问本地服务器端口来发出请求的情况下使用,尽管这可以调整。
import sys
import time
import threading
import argparse
from flask import Flask, request
from flask_restful import Resource, Api
class Server:
def __init__(self):
self.data = {'at_eof': False,
'lines_read': 0,
'latest_lines': []}
self.thread = None
self.args = None
self.stop = False
def parse_args(self):
parser = argparse.ArgumentParser()
parser.add_argument("num_lines", type=int,
help="number of lines to cache")
parser.add_argument("port", type=int,
help="port to serve on")
self.args = parser.parse_args()
def start_updater(self):
def updater():
lines = self.data['latest_lines']
while True:
if self.stop:
return
line = sys.stdin.readline()
if not line:
break
self.data['lines_read'] += 1
lines.append(line)
while len(lines) > self.args.num_lines:
lines.pop(0)
self.data['at_eof'] = True
self.thread = threading.Thread(target=updater)
self.thread.start()
def get_data(self):
return self.data
def shutdown(self):
self.stop = True
func = request.environ.get('werkzeug.server.shutdown')
if func:
func()
return 'Shutting down'
else:
return 'shutdown failed'
def add_apis(self, app):
class GetData(Resource):
get = self.get_data
class Shutdown(Resource):
get = self.shutdown
api = Api(app)
api.add_resource(GetData, "/getdata")
api.add_resource(Shutdown, "/shutdown")
def run(self):
self.parse_args()
self.start_updater()
app = Flask(__name__)
self.add_apis(app)
app.run(port=self.args.port)
server = Server()
server.run()
示例用法:这是一个我们要为其输出提供服务的测试程序:
import sys
import time
for i in range(100):
print("this is line {}".format(i))
sys.stdout.flush()
time.sleep(.1)
还有一个启动它的简单管道(此处来自 linux shell 提示符,但可以通过 subprocess.Popen 完成),在端口 8001 上提供最后 5 行:
python ./writer.py | python ./server.py 5 8001
一个示例查询,这里使用 curl 作为客户端,但可以通过 Python requests 完成:
$ curl -s http://localhost:8001/getdata
{"at_eof": false, "lines_read": 30, "latest_lines": ["this is line 25\n", "this is line 26\n", "this is line 27\n", "this is line 28\n", "this is line 29\n"]}
服务器还提供了一个http://localhost:<port>/shutdown URL 来终止它,但如果您在第一次看到"at_eof": true 之前调用它,那么预计作者会因管道损坏而死。