【问题标题】:Tornado asynchronous coroutineTornado 异步协程
【发布时间】:2021-02-02 10:47:50
【问题描述】:

很久没有使用龙卷风了。我想要一个 websocket,它可以从运行龙卷风的主机的串行设备中获取更新。所以我尝试使用龙卷风进行多处理,但该进程无法访问龙卷风 websocket。我试图将它合并为协程,但似乎没有产生。

class WebApplication(tornado.web.Application):
    def __init__(self):
        handlers = [
            (r'/', IndexPageHandler),
            (r"/config", ConfigHandler),
            (r"/shutdown", ShutdownHandler),
            (r'/websocket', WebSocketHandler),
            (r'/(.*)', tornado.web.StaticFileHandler, {'path': resourcesWeb})
        ]

        settings = {
            'debug': debug,
            'static_path': resourcesWeb,
            'template_path': 'templates'
        }
        tornado.web.Application.__init__(self, handlers, **settings)

    @gen.coroutine
    def serial_reader(self):
        log('serial_reader: start')
        done = False
        while not done:
            sh.read()
            serial_data_from = str(sh.data)
            if len(serial_data_from) > 0:
                if debug:
                    log('serial read:' + serial_data_from)
                    yield [con.write_message(serial_data_from) for con in WebSocketHandler.connections]
            yield gen.sleep(0.3)
        log('serial_reader: exit')

Python 3.8.5,Tornad 6.1

我将如何正确且不断地使用来自 tornado 应用程序外部的数据更新 websocket

【问题讨论】:

    标签: python-3.x tornado


    【解决方案1】:

    由于sh.read 被阻塞,您需要在执行程序中运行它。然后要在主线程中通知客户端,您需要使用IOLoop.add_callback(可以安全地从任何线程调用)。这也意味着 reader 方法变成了常规的同步方法。

    例子:

    from concurrent.futures import ThreadPoolExecutor
    import functools
    
    from tornado import web, websocket, ioloop
    
    log = print
    
    
    class IndexHandler(web.RequestHandler):
        def get(self):
            self.write("""<html>
                <textarea cols="30" rows="10" id="output">%s</textarea><br />
                <a href="/start" target="f" onclick="log(this.innerHTML)">start</a><br />
                <a href="/stop" target="f" onclick="log(this.innerHTML)">stop</a><br />
                <iframe name="f" width="100" height="30"></iframe>
                <script>
                    ws = new WebSocket("ws://localhost:8888/stream");
                    out_el = document.getElementById("output");
                    function log(data) {out_el.value = data + "\\n" + out_el.value;}
                    ws.onmessage = function (ev) {log(ev.data);}
                </script>""" % "\n".join(map(str, reversed(self.application.read_data))))
    
    
    class StartHandler(web.RequestHandler):
        def get(self):
            self.application.start_reader()
            self.write("Started")
    
    
    class StopHandler(web.RequestHandler):
        def get(self):
            self.application.stop_reader()
            self.write("Stopped")
    
    
    class WebSocketHandler(websocket.WebSocketHandler):
        connections = set()
    
        def open(self):
            WebSocketHandler.connections.add(self)
    
        def on_close(self):
            if self in WebSocketHandler.connections:
                WebSocketHandler.connections.remove(self)
    
    
    class WebApplication(web.Application):
        def __init__(self, autostart=False):
            handlers = [
                (r"/", IndexHandler),
                (r"/start", StartHandler),
                (r"/stop", StopHandler),
                (r'/stream', WebSocketHandler),
            ]
            web.Application.__init__(self, handlers)
            self._reader_executor = ThreadPoolExecutor(1)
            self._keep_reading = None
            self.read_data = []
            if autostart:
                self.start_reader()
        
        def start_reader(self):
            if not self._keep_reading:
                self._keep_reading = True
                loop = ioloop.IOLoop.current()
                self._reader_future = loop.run_in_executor(self._reader_executor, functools.partial(self.reader, loop))
        
        def stop_reader(self):
            if self._keep_reading:
                self._keep_reading = False
                self._reader_future.cancel()
        
        def notify_clients(self, data=None):
            for con in WebSocketHandler.connections:
                try:
                    con.write_message("{}".format(data))
                except Exception as ex:
                    log("error sending to {}".format(con))
        
        def reader(self, main_loop):
            import random
            import time
            while self._keep_reading:
                time.sleep(1 + random.random())  # simulate read - block for some time
                data = random.getrandbits(32)
                print("reader: data={}".format(data))
                if data:
                    main_loop.add_callback(self.notify_clients, data)
                    self.read_data.append(data)
                time.sleep(0.1)
    
    
    if __name__ == "__main__":
        app = WebApplication(True)
        app.listen(8888)
        loop = ioloop.IOLoop.current()
        try:
            loop.start()
        except KeyboardInterrupt as ex:
            app.stop_reader()
            for con in WebSocketHandler.connections:
                con.close()
            loop.stop()
    
    

    【讨论】:

    • 我只需要使用 self.serial_reader() 就可以启动,但是 WebAppcication 不再响应
    • 啊,sh.read() 可能会阻塞。您应该将读取拆分为同步方法并在执行程序中运行它 - tornadoweb.org/en/stable/guide/…
    • 谢谢。该文档似乎在谈论这个问题。但是,要获得您的赃物,您至少可以举一个运行示例?
    • 谢谢。我必须研究它。但是效果很好
    猜你喜欢
    • 1970-01-01
    • 2011-06-23
    • 1970-01-01
    • 2013-08-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-03-29
    相关资源
    最近更新 更多