【问题标题】:Implementing and testing WebSocket server connection timeout实现和测试 WebSocket 服务器连接超时
【发布时间】:2014-06-01 14:38:12
【问题描述】:

我正在 Tornado 3.2 中实现一个 WebSockets 服务器。连接到服务器的客户端不会是浏览器。

对于服务器和客户端之间来回通信的情况,我想添加一个最大值。服务器在关闭连接之前等待客户端响应的时间。

这大致是我一直在尝试的:

import datetime
import tornado

class WSHandler(WebSocketHandler):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.timeout = None

    def _close_on_timeout(self):
        if self.ws_connection:
            self.close()

    def open(self):
        initialize()

    def on_message(self, message):
        # Remove previous timeout, if one exists.
        if self.timeout:
            tornado.ioloop.IOLoop.instance().remove_timeout(self.timeout)
            self.timeout = None

        if is_last_message:
            self.write_message(message)
            self.close()
        else:
            # Add a new timeout.
            self.timeout = tornado.ioloop.IOLoop.instance().add_timeout(
                datetime.timedelta(milliseconds=1000), self._close_on_timeout)
            self.write_message(message)

我是一个笨蛋吗?有没有更简单的方法可以做到这一点?我什至无法通过上面的 add_timeout 安排一个简单的打印语句。

我还需要一些帮助来测试这个。这是我目前所拥有的:

from tornado.websocket import websocket_connect
from tornado.testing import AsyncHTTPTestCase, gen_test
import time

class WSTests(AsyncHTTPTestCase):

    @gen_test
    def test_long_response(self):
        ws = yield websocket_connect('ws://address', io_loop=self.io_loop)

        # First round trip.
        ws.write_message('First message.')
        result = yield ws.read_message()
        self.assertEqual(result, 'First response.')

        # Wait longer than the timeout.
        # The test is in its own IOLoop, so a blocking sleep should be okay?
        time.sleep(1.1)

        # Expect either write or read to fail because of a closed socket.
        ws.write_message('Second message.')
        result = yield ws.read_message()

        self.assertNotEqual(result, 'Second response.')

客户端写入和读取套接字没有问题。这可能是因为 add_timeout 没有触发。

测试是否需要以某种方式让出以允许服务器上的超时回调运行?我不会想到,因为文档说测试在他们自己的 IOLoop 中运行。

编辑

根据 Ben 的建议,这是工作版本。

import datetime
import tornado

class WSHandler(WebSocketHandler):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.timeout = None

    def _close_on_timeout(self):
        if self.ws_connection:
            self.close()

    def open(self):
        initialize()

    def on_message(self, message):
        # Remove previous timeout, if one exists.
        if self.timeout:
            tornado.ioloop.IOLoop.current().remove_timeout(self.timeout)
            self.timeout = None

        if is_last_message:
            self.write_message(message)
            self.close()
        else:
            # Add a new timeout.
            self.timeout = tornado.ioloop.IOLoop.current().add_timeout(
                datetime.timedelta(milliseconds=1000), self._close_on_timeout)
            self.write_message(message)

测试:

from tornado.websocket import websocket_connect
from tornado.testing import AsyncHTTPTestCase, gen_test
import time

class WSTests(AsyncHTTPTestCase):

    @gen_test
    def test_long_response(self):
        ws = yield websocket_connect('ws://address', io_loop=self.io_loop)

        # First round trip.
        ws.write_message('First message.')
        result = yield ws.read_message()
        self.assertEqual(result, 'First response.')

        # Wait a little more than the timeout.
        yield gen.Task(self.io_loop.add_timeout, datetime.timedelta(seconds=1.1))

        # Expect either write or read to fail because of a closed socket.
        ws.write_message('Second message.')
        result = yield ws.read_message()
        self.assertEqual(result, None)

【问题讨论】:

  • 感谢您的工作版本。这是一个很大的帮助。

标签: python tornado


【解决方案1】:

第一个示例中的超时处理代码对我来说是正确的。

对于测试,每个测试用例都有自己的 IOLoop,但是对于测试和它运行的任何其他东西都只有一个 IOLoop,所以你必须在这里使用 add_timeout 而不是 time.sleep() 以避免阻塞服务器.

【讨论】:

  • 谢谢本。在上面添加了一个编辑以显示我正在尝试的内容。测试超时回调运行,但处理程序中的超时回调仍未触发。我是否正确启动了 IOLoop?
  • 如果您使用@gen_test,它将为您处理启动/停止 IOLoop,因此您应该删除更新测试的最后两行。在协程中,您可能希望使用 yield gen.Task(self.io_loop.add_timeout, timedelta(seconds=1.1)) 而不是将回调传递给 add_timeout。混合协程和回调可能会很棘手(在这里,您需要用@gen.coroutine 装饰_check 并使用io_loop.add_future 来安排它)
  • 本,我很抱歉我太密集了。我进行了更改,果然,测试现在等待适当的时间。但是仍然没有调用服务端的超时回调,等待1.1秒后测试就可以对socket进行读写了。有什么想法吗?
  • 你需要使用 IOLoop.current(),而不是 IOLoop.instance()。
【解决方案2】:

Ey Ben,我知道这个问题很久以前就解决了,但我想与任何阅读本文的用户分享我为此制定的解决方案。 它基本上是基于你的,但它解决了来自外部服务的问题,该服务可以使用组合而不是继承轻松集成到任何 websocket 中:

class TimeoutWebSocketService():
    _default_timeout_delta_ms = 10 * 60 * 1000  # 10 min

    def __init__(self, websocket, ioloop=None, timeout=None):
        # Timeout
        self.ioloop = ioloop or tornado.ioloop.IOLoop.current()
        self.websocket = websocket
        self._timeout = None
        self._timeout_delta_ms = timeout or TimeoutWebSocketService._default_timeout_delta_ms

    def _close_on_timeout(self):
        self._timeout = None
        if self.websocket.ws_connection:
            self.websocket.close()

    def refresh_timeout(self, timeout=None):
        timeout = timeout or self._timeout_delta_ms
        if timeout > 0:
            # Clean last timeout, if one exists
            self.clean_timeout()

            # Add a new timeout (must be None from clean).
            self._timeout = self.ioloop.add_timeout(
                datetime.timedelta(milliseconds=timeout), self._close_on_timeout)

    def clean_timeout(self):
        if self._timeout is not None:
            # Remove previous timeout, if one exists.
            self.ioloop.remove_timeout(self._timeout)
            self._timeout = None

为了使用该服务,它很容易创建一个新的 TimeoutWebService 实例(可选地以毫秒为单位的超时,以及应该执行它的 ioloop)并调用方法 ''refresh_timeout'' 来设置第一次超时或重置已经存在的超时,或“clean_timeout”以停止超时服务。

class BaseWebSocketHandler(WebSocketHandler):
    def prepare(self):
        self.timeout_service = TimeoutWebSocketService(timeout=(1000*60))

        ## Optionally starts the service here 
        self.timeout_service.refresh_timeout()

        ## rest of prepare method 

    def on_message(self):
        self.timeout_service.refresh_timeout()

    def on_close(self):
        self.timeout_service.clean_timeout()

多亏了这种方法,您可以准确地控制要重新启动超时的时间和条件,这可能因应用程序而异。例如,如果用户满足 X 条件,或者消息是预期的,您可能只想刷新超时。

我希望大家喜欢这个解决方案!

【讨论】:

    猜你喜欢
    • 2017-06-20
    • 2017-02-26
    • 2018-10-20
    • 1970-01-01
    • 2019-01-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-08-27
    相关资源
    最近更新 更多